any help would be GREATLY appreciated.
this is how we're running jupyter/all-spark-notebook on a separate amazon linux ec2 node:
docker run -t -i -p 8888:8888 -p 4040:4040 -p 4041:4041 -e SPARK_OPTS='--master=spark://[EC2_HOST]].us-west-2.compute.amazonaws.com:7077 --packages=org.apache.hadoop:hadoop-aws:2.7.1,org.apache.httpcomponents:httpclient:4.3.6,org.apache.httpcomponents:httpcore:4.3.3,com.amazonaws:aws-java-sdk-core:1.10.27,com.amazonaws:aws-java-sdk-s3:1.10.27,com.amazonaws:aws-java-sdk-sts:1.10.27' --net=host --pid=host -e TINI_SUBREAPER=true -v /home/ec2-user:/home/jovyan/work jupyter/all-spark-notebook
sample notebook code:
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "[AWS_KEY]")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "[AWS_SECRET]")
val df = sqlContext.read.load("s3n://snowflake-sample-data/airlines_parquet")
df.take(1)
$ cat spark-defaults.conf
spark.executor.memory 6154m
spark.executor.extraLibraryPath /root/ephemeral-hdfs/lib/native/
spark.executor.extraClassPath /root/ephemeral-hdfs/conf
# for spark version < 1.4.0
spark.tachyonStore.url tachyon://ec2-54-245-36-142.us-west-2.compute.amazonaws.com:19998
# for spark version >= 1.4.0
spark.externalBlockStore.url tachyon://ec2-54-245-36-142.us-west-2.compute.amazonaws.com:19998
# jars for s3
spark.jars.packages org.apache.hadoop:hadoop-aws:2.7.1,org.apache.httpcomponents:httpclient:4.3.6,org.apache.httpcomponents:httpcore:4.3.3,com.amazonaws:aws-java-sdk-core:1.10.27,com.amazonaws:aws-java-sdk-s3:1.10.27,com.amazonaws:aws-java-sdk-sts:1.10.27
works fine with local spark but fails with this exception against remote stand-alone spark:
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 4, ip-172-31-23-28.us-west-2.compute.internal): java.io.IOException: Could not read footer: java.io.IOException: Could not read footer for file org.apache.hadoop.fs.FileStatus@62387f83
at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:786)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:775)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not read footer for file org.apache.hadoop.fs.FileStatus@62387f83
at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:239)
at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more
Caused by: java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1443)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1464)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:405)
at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237)
... 5 more
Driver stacktrace:
StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
scala.Option.foreach(Option.scala:236)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.RDD.collect(RDD.scala:926)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$.mergeSchemasInParallel(ParquetRelation.scala:799)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$MetadataCache$$readSchema(ParquetRelation.scala:517)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache$$anonfun$12.apply(ParquetRelation.scala:421)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache$$anonfun$12.apply(ParquetRelation.scala:421)
scala.Option.orElse(Option.scala:257)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:421)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
$line12.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:21)
$line12.$read$$iwC$$iwC$$iwC.<init>(<console>:26)
$line12.$read$$iwC$$iwC.<init>(<console>:28)
$line12.$read$$iwC.<init>(<console>:30)
$line12.$read.<init>(<console>:32)
$line12.$read$.<init>(<console>:36)
$line12.$read$.<clinit>(<console>)
$line12.$eval$.<init>(<console>:7)
$line12.$eval$.<clinit>(<console>)
$line12.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:361)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
In [ ]:
I forgot to mention I did copy this spark-defaults.conf to the worker(s) and it didn't help:
~/spark-ec2/copy-dir /root/spark/conf/spark-defaults.conf
How did you instantiate the sc variable?
@taeric -- jupyter automatically created the sc variable as part of the toree kernel.
for anyone following along, spark 2.0 seems to help with the s3 issues but now I need a jupyter/toree built against spark 2.0.
see related issues.
I see that you are hitting a spark context on an EC2 instance. Curious if you tried taking advantage of IAM roles to connect to s3. In particular, it looks like you should be able to use class "s3n" addresses to connect from in your cluster and have it "just work."
This issue has been idle for a few months. The upgrade to Spark 2.x and Toree may help resolve the issue.
If anyone would like to submit an recipe for connecting to S3 using Spark, we'll gladly take it.
Most helpful comment
This issue has been idle for a few months. The upgrade to Spark 2.x and Toree may help resolve the issue.
If anyone would like to submit an recipe for connecting to S3 using Spark, we'll gladly take it.