i trying use apache-spark parallelly handle separated csv files in directory. specifically, want each of slave nodes add numbers of first column in each csv files, , send computing result. following code:
import os, sys, inspect, csv ### current directory path. curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0] ### setup environment variables spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../../spark-1.4.0"))) python_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../python"))) os.environ["spark_home"] = spark_home_dir os.environ["pythonpath"] = python_dir ### setup pyspark directory path pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../python"))) sys.path.append(pyspark_dir) ### import pyspark pyspark import sparkconf, sparkcontext ### specify data file directory, , load data files data_path = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "./test_dir"))) ### myfunc add numbers in first column. def myfunc(s): s_new = os.path.realpath(os.path.abspath(os.path.join(data_path, s))) cr = csv.reader(open(s_new,"rb")) total = 0 row in cr: total += int(row[0]) return total def main(): ### initialize sparkconf , sparkcontext conf = sparkconf().setappname("ruofan").setmaster("local") sc = sparkcontext(conf = conf) datafile = sc.wholetextfiles(data_path) ### sent application in each of slave node temp = datafile.foreach(myfunc) ### collect result , print out. x in temp.sample(false, 1).collect(): print x if __name__ == "__main__": main() but when run code, shows error below:
attributeerror: 'tuple' object has no attribute 'startswith' and stacktrace follow:
error executor: exception in task 0.0 in stage 0.0 (tid 0) org.apache.spark.api.python.pythonexception: traceback (most recent call last): file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func return f(iterator) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition f(x) file "sum.py", line 24, in myfunc s_new = os.path.realpath(os.path.abspath(os.path.join(data_path, s))) file "/usr/lib/python2.7/posixpath.py", line 75, in join if b.startswith('/'): attributeerror: 'tuple' object has no attribute 'startswith' @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138) @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63) @ org.apache.spark.scheduler.task.run(task.scala:70) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) 15/07/14 16:52:15 warn tasksetmanager: lost task 0.0 in stage 0.0 (tid 0, localhost): org.apache.spark.api.python.pythonexception: traceback (most recent call last): file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func return f(iterator) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition f(x) file "sum.py", line 24, in myfunc s_new = os.path.realpath(os.path.abspath(os.path.join(data_path, s))) file "/usr/lib/python2.7/posixpath.py", line 75, in join if b.startswith('/'): attributeerror: 'tuple' object has no attribute 'startswith' @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138) @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63) @ org.apache.spark.scheduler.task.run(task.scala:70) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) 15/07/14 16:52:15 error tasksetmanager: task 0 in stage 0.0 failed 1 times; aborting job 15/07/14 16:52:15 info taskschedulerimpl: removed taskset 0.0, tasks have completed, pool 15/07/14 16:52:15 info taskschedulerimpl: cancelling stage 0 15/07/14 16:52:15 info dagscheduler: resultstage 0 (foreach @ sum.py:40) failed in 0.408 s 15/07/14 16:52:15 info dagscheduler: job 0 failed: foreach @ sum.py:40, took 0.458805 s traceback (most recent call last): file "sum.py", line 47, in <module> main() file "sum.py", line 40, in main temp = datafile.foreach(myfunc) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 721, in foreach self.mappartitions(processpartition).count() # force evaluation file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 972, in count return self.mappartitions(lambda i: [sum(1 _ in i)]).sum() file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 963, in sum return self.mappartitions(lambda x: [sum(x)]).reduce(operator.add) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 771, in reduce vals = self.mappartitions(func).collect() file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 745, in collect port = self.ctx._jvm.pythonrdd.collectandserve(self._jrdd.rdd()) file "/usr/local/lib/python2.7/dist-packages/py4j-0.8.2.1-py2.7.egg/py4j/java_gateway.py", line 538, in __call__ self.target_id, self.name) file "/usr/local/lib/python2.7/dist-packages/py4j-0.8.2.1-py2.7.egg/py4j/protocol.py", line 300, in get_return_value format(target_id, '.', name), value) py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.collectandserve. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 0.0 failed 1 times, recent failure: lost task 0.0 in stage 0.0 (tid 0, localhost): org.apache.spark.api.python.pythonexception: traceback (most recent call last): file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() file "/home/ying/aws_tutorial/spark-1.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 2318, in pipeline_func return func(split, prev_func(split, iterator)) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 304, in func return f(iterator) file "/home/ying/aws_tutorial/spark-1.4.0/python/pyspark/rdd.py", line 719, in processpartition f(x) file "sum.py", line 24, in myfunc s_new = os.path.realpath(os.path.abspath(os.path.join(data_path, s))) file "/usr/lib/python2.7/posixpath.py", line 75, in join if b.startswith('/'): attributeerror: 'tuple' object has no attribute 'startswith' @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:138) @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:179) @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:97) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:277) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:244) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:63) @ org.apache.spark.scheduler.task.run(task.scala:70) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1266) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1257) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1256) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1256) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:730) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1450) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1411) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) i don't know why code not work. appreciate if can me solve problem. thanks!
the stack trace pretty evident error coming myfunc line:
s_new = os.path.realpath(os.path.abspath(os.path.join(data_path, s))) s expected array of string, passing tuple
temp = datafile.foreach(myfunc) where datafile wholetextfiles, results in rdd[(string, string)]. first item in tuple path, second contents. so, need pass in first part of tuple (path) of code:
datafile.foreach(lambda (path, content): myfunc(path))
Comments
Post a Comment