scala - Explanation of fold method of spark RDD -


i running spark-1.4.0 pre-built hadoop-2.4 (in local master mode) calculate sum of squares of double rdd. scala code

sc.parallelize(array(2., 3.)).fold(0.0)((p, v) => p+v*v).

and gave surprising result 97.0.

this quite counter-intuitive compared scala version of fold

array(2., 3.).fold(0.0)((p, v) => p+v*v)

which gives expected answer 13.0.

it seems quite have made tricky mistakes in code due lack of understanding. have read how function used in rdd.fold() should communicative otherwise result may depend on partitions , etc. example if change number of partitions 1,

sc.parallelize(array(2., 3.), 1).fold(0.0)((p, v) => p+v*v)

the code give me 169.0 on machine!

can explain happened here?

well, pretty explained official documentation:

aggregate elements of each partition, , results partitions, using given associative , commutative function , neutral "zero value". function op(t1, t2) allowed modify t1 , return result value avoid object allocation; however, should not modify t2.

this behaves differently fold operations implemented non-distributed collections in functional languages scala. fold operation may applied partitions individually, , fold results final result, rather apply fold each element sequentially in defined ordering. functions not commutative, result may differ of fold applied non-distributed collection.

to illustrate going on lets try simulate going on step step:

val rdd = sc.parallelize(array(2., 3.))  val bypartition = rdd.mappartitions(     iter => array(iter.fold(0.0)((p, v) => (p +  v * v))).toiterator).collect() 

it gives similar array[double] = array(0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 9.0) and

bypartition.reduce((p, v) => (p + v * v)) 

returns 97

important thing note results can differ run run depending on order in partitions combined.


Comments