i running spark-1.4.0 pre-built hadoop-2.4 (in local master mode) calculate sum of squares of double rdd. scala code
sc.parallelize(array(2., 3.)).fold(0.0)((p, v) => p+v*v).
and gave surprising result 97.0.
this quite counter-intuitive compared scala version of fold
array(2., 3.).fold(0.0)((p, v) => p+v*v)
which gives expected answer 13.0.
it seems quite have made tricky mistakes in code due lack of understanding. have read how function used in rdd.fold() should communicative otherwise result may depend on partitions , etc. example if change number of partitions 1,
sc.parallelize(array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)
the code give me 169.0 on machine!
can explain happened here?
well, pretty explained official documentation:
aggregate elements of each partition, , results partitions, using given associative , commutative function , neutral "zero value". function op(t1, t2) allowed modify t1 , return result value avoid object allocation; however, should not modify t2.
this behaves differently fold operations implemented non-distributed collections in functional languages scala. fold operation may applied partitions individually, , fold results final result, rather apply fold each element sequentially in defined ordering. functions not commutative, result may differ of fold applied non-distributed collection.
to illustrate going on lets try simulate going on step step:
val rdd = sc.parallelize(array(2., 3.)) val bypartition = rdd.mappartitions( iter => array(iter.fold(0.0)((p, v) => (p + v * v))).toiterator).collect() it gives similar array[double] = array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0) and
bypartition.reduce((p, v) => (p + v * v)) returns 97
important thing note results can differ run run depending on order in partitions combined.
Comments
Post a Comment