i testing spark streaming api. application deployed on amazon emr cluster spark 1.4.0 sorting data , saving files in s3.
the code of pipeline (except sort algorithm) detailed below :
public kinesispreprocesspipeline(javastreamingcontext jssc, final kinesispreprocessmoduleconfiguration moduleconfiguration) { javareceiverinputdstream<byte[]> inputdstream = kinesisutils.createstream(jssc, moduleconfiguration.getappname(), moduleconfiguration.getstreamname(), "kinesis." + moduleconfiguration.getregion() + ".amazonaws.com", moduleconfiguration.getregion(), initialpositioninstream.latest, durations.seconds(5), storagelevel.memory_and_disk_ser()); javadstream<streamingmessage> messagejavadstream = inputdstream.map(new function<byte[], streamingmessage>() { @override public streamingmessage call(byte[] bytes) throws exception { return jsonparser.fromjson(new string(bytes), streamingmessage.class); } }); final string destinationfolder = moduleconfiguration.getdestinationfolder(); streamingpreprocesspipeline pipeline = new streamingpreprocesspipeline().withinputdstream(messagejavadstream) .withpreprocessstep(new sortpreprocess()); javadstream<streamingmessage> output = pipeline.execute(); output.checkpoint(durations.seconds(moduleconfiguration.getbatchinterval() * 2)); javadstream<string> messagesasjson = output.map(new function<streamingmessage, string>() { @override public string call(streamingmessage message) throws exception { return jsonparser.tojson(message); } }); messagesasjson.foreachrdd(new function<javardd<string>, void>() { @override public void call(javardd<string> rdd) throws exception { rdd.saveastextfile(destinationfolder + "/" + dateformat.print(new datetime()) + "-" + rdd.id()); return null; } }); } when application run on cluster, fails fast following error.
15/07/17 13:17:36 error executor.executor: exception in task 0.1 in stage 8.0 (tid 90) java.lang.illegalargumentexception: comparison method violates general contract! @ org.apache.spark.util.collection.timsort$sortstate.mergelo(timsort.java:776) @ org.apache.spark.util.collection.timsort$sortstate.mergeat(timsort.java:507) @ org.apache.spark.util.collection.timsort$sortstate.mergecollapse(timsort.java:435) @ org.apache.spark.util.collection.timsort$sortstate.access$200(timsort.java:307) @ org.apache.spark.util.collection.timsort.sort(timsort.java:135) @ org.apache.spark.util.collection.sorter.sort(sorter.scala:37) @ org.apache.spark.util.collection.partitionedpairbuffer.partitioneddestructivesortediterator(partitionedpairbuffer.scala:70) @ org.apache.spark.util.collection.externalsorter.partitionediterator(externalsorter.scala:690) @ org.apache.spark.util.collection.externalsorter.iterator(externalsorter.scala:708) @ org.apache.spark.shuffle.hash.hashshufflereader.read(hashshufflereader.scala:64) @ org.apache.spark.rdd.shuffledrdd.compute(shuffledrdd.scala:90) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.cachemanager.getorcompute(cachemanager.scala:69) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:242) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:35) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63) @ org.apache.spark.scheduler.task.run(task.scala:70) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745
the error happens on foreachrdd step i'm still searching why fails...
the class used sorting had bug in compareto implementation. javadoc comparable recommend implement compareto in consistent way equals(). after fixing bug, spark job works expected.
Comments
Post a Comment