i've been trying learn how use apache spark, , i'm having issues trying sum values in column cassandra (using datastax spark-cassandra-connector). try results in java.lang.outofmemoryerror: java heap space.
here's code i'm submitting spark master:
object benchmark { def main( args: array[ string ] ) { val conf = new sparkconf() .setappname( "app" ) .set( "spark.cassandra.connection.host", "ec2-blah.compute-1.amazonaws.com" ) .set( "spark.cassandra.auth.username", "myusername" ) .set( "spark.cassandra.auth.password", "mypassword" ) .set( "spark.executor.memory", "4g" ) val sc = new sparkcontext( conf ) val tbl = sc.cassandratable( "mykeyspace", "mytable" ) val res = tbl.map(_.getfloat("sclrdata")).sum() println( "sum = " + res ) } } right have single spark worker node in cluster, , possible given size of table, not of can fit in memory @ once. didn't think issue since spark supposed lazily evaluate commands, , summing values in column shouldn't need have entire table reside in memory @ once.
i'm newbie topic, clarification why wouldn't work or how correctly appreciated.
thanks
perhaps spark building entire table single in memory partition can mapping operations on it.
i thought spark supposed spill disk rather throw outofmemoryexceptions, maybe isn't able spill if there single partition. saw similar problem here, , solved specifying split size this:
conf = new sparkconf(); conf.setappname("test"); conf.setmaster("local[4]"); conf.set("spark.cassandra.connection.host", "192.168.1.15"). set("spark.executor.memory", "2g"). set("spark.cassandra.input.split.size_in_mb", "67108864"); so try setting spark.cassandra.input.split.size_in_mb in conf.
i imagine allow spark sum chunks of table , evict chunks memory when needs space new chunks.
another thing specifying storage level table rdd allow spill disk. think adding ".persist(storagelevel.memory_and_disk)". default appears memory_only. see more information on storage levels here, in rdd persistence section.
Comments
Post a Comment