scala - Spark Cassandra Aggregation java.lang.OutOfMemoryError: Java heap space -


i've been trying learn how use apache spark, , i'm having issues trying sum values in column cassandra (using datastax spark-cassandra-connector). try results in java.lang.outofmemoryerror: java heap space.

here's code i'm submitting spark master:

object benchmark {   def main( args: array[ string ] ) {     val conf    = new sparkconf()                   .setappname( "app" )                   .set( "spark.cassandra.connection.host", "ec2-blah.compute-1.amazonaws.com" )                   .set( "spark.cassandra.auth.username", "myusername" )                   .set( "spark.cassandra.auth.password", "mypassword" )                   .set( "spark.executor.memory", "4g" )     val sc      = new sparkcontext( conf )     val tbl     = sc.cassandratable( "mykeyspace", "mytable" )     val res     = tbl.map(_.getfloat("sclrdata")).sum()      println( "sum = " + res )   } } 

right have single spark worker node in cluster, , possible given size of table, not of can fit in memory @ once. didn't think issue since spark supposed lazily evaluate commands, , summing values in column shouldn't need have entire table reside in memory @ once.

i'm newbie topic, clarification why wouldn't work or how correctly appreciated.

thanks

perhaps spark building entire table single in memory partition can mapping operations on it.

i thought spark supposed spill disk rather throw outofmemoryexceptions, maybe isn't able spill if there single partition. saw similar problem here, , solved specifying split size this:

conf = new sparkconf();         conf.setappname("test");         conf.setmaster("local[4]");         conf.set("spark.cassandra.connection.host", "192.168.1.15").         set("spark.executor.memory", "2g").         set("spark.cassandra.input.split.size_in_mb", "67108864"); 

so try setting spark.cassandra.input.split.size_in_mb in conf.

i imagine allow spark sum chunks of table , evict chunks memory when needs space new chunks.

another thing specifying storage level table rdd allow spill disk. think adding ".persist(storagelevel.memory_and_disk)". default appears memory_only. see more information on storage levels here, in rdd persistence section.


Comments