i'm trying super simple test program on spark 2.0 on amazon emr 5.0:
from pyspark.sql.types import row pyspark.sql.types import * import pyspark.sql.functions spark_functions schema = structtype([ structfield("cola", stringtype()), structfield("colb", integertype()), ]) rows = [ row("alpha", 1), row("beta", 2), row("gamma", 3), row("delta", 4) ] data_frame = spark.createdataframe(rows, schema) print("count={}".format(data_frame.count())) data_frame.write.save("s3a://test3/test_data.parquet", mode="overwrite") print("done")
results in:
count=4 py4jjavaerror: error occurred while calling o85.save. : org.apache.spark.sparkexception: job aborted. @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelationcommand$$anonfun$run$1.apply$mcv$sp(insertintohadoopfsrelationcommand.scala:149) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelationcommand$$anonfun$run$1.apply(insertintohadoopfsrelationcommand.scala:115) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelationcommand$$anonfun$run$1.apply(insertintohadoopfsrelationcommand.scala:115) @ org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution.scala:57) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelationcommand.run(insertintohadoopfsrelationcommand.scala:115) @ org.apache.spark.sql.execution.command.executedcommandexec.sideeffectresult$lzycompute(commands.scala:60) @ org.apache.spark.sql.execution.command.executedcommandexec.sideeffectresult(commands.scala:58) @ org.apache.spark.sql.execution.command.executedcommandexec.doexecute(commands.scala:74) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$1.apply(sparkplan.scala:115) @ org.apache.spark.sql.execution.sparkplan$$anonfun$execute$1.apply(sparkplan.scala:115) @ org.apache.spark.sql.execution.sparkplan$$anonfun$executequery$1.apply(sparkplan.scala:136) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151) @ org.apache.spark.sql.execution.sparkplan.executequery(sparkplan.scala:133) @ org.apache.spark.sql.execution.sparkplan.execute(sparkplan.scala:114) @ org.apache.spark.sql.execution.queryexecution.tordd$lzycompute(queryexecution.scala:86) @ org.apache.spark.sql.execution.queryexecution.tordd(queryexecution.scala:86) @ org.apache.spark.sql.execution.datasources.datasource.write(datasource.scala:487) @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:211) @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:194) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:237) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357) @ py4j.gateway.invoke(gateway.java:280) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:128) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:211) @ java.lang.thread.run(thread.java:745) caused by: java.lang.illegalargumentexception: bound must positive @ java.util.random.nextint(random.java:388) @ org.apache.hadoop.fs.localdirallocator$allocatorpercontext.confchanged(localdirallocator.java:305) @ org.apache.hadoop.fs.localdirallocator$allocatorpercontext.getlocalpathforwrite(localdirallocator.java:344) @ org.apache.hadoop.fs.localdirallocator$allocatorpercontext.createtmpfileforwrite(localdirallocator.java:416) @ org.apache.hadoop.fs.localdirallocator.createtmpfileforwrite(localdirallocator.java:198) @ org.apache.hadoop.fs.s3a.s3aoutputstream.<init>(s3aoutputstream.java:87) @ org.apache.hadoop.fs.s3a.s3afilesystem.create(s3afilesystem.java:421) @ org.apache.hadoop.fs.filesystem.create(filesystem.java:913) @ org.apache.hadoop.fs.filesystem.create(filesystem.java:894) @ org.apache.hadoop.fs.filesystem.create(filesystem.java:791) @ org.apache.hadoop.fs.filesystem.create(filesystem.java:780) @ org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter.commitjob(fileoutputcommitter.java:336) @ org.apache.parquet.hadoop.parquetoutputcommitter.commitjob(parquetoutputcommitter.java:46) @ org.apache.spark.sql.execution.datasources.basewritercontainer.commitjob(writercontainer.scala:222) @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelationcommand$$anonfun$run$1.apply$mcv$sp(insertintohadoopfsrelationcommand.scala:144) ... 29 more (<class 'py4j.protocol.py4jjavaerror'>, py4jjavaerror(u'an error occurred while calling o85.save.\n', javaobject id=o86), <traceback object @ 0x7fa65dec5368>)
had same issue, , after lot of messing appears s3:// , s3n:// work. lot slower s3a:// ... way s3a:// work setting buffer directory isn't doing fast copy direct memory -
hadoopconf=sc._jsc.hadoopconfiguration() hadoopconf.set("fs.s3a.buffer.dir", "/home/hadoop,/tmp")
it's unfortunately not quicker normal s3/s3n enabled however!
edit: adding works rid of error, realised assuming doing fast copy. unfortunately no faster ... hadoopconf.set("fs.s3a.fast.upload", "true")
Comments
Post a Comment