EOFException using KryoSerializer
I'm seeing the following exception ONLY when I run on a Mesos cluster. If I run the exact same code with master set to "local[N]" I have no problem: 2015-05-19 16:45:43,484 [task-result-getter-0] WARN TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) KryoSerializer explicitly throws an EOFException. The comment says: // DeserializationStream uses the EOF exception to indicate stopping condition. Apparently this isn't what TorrentBroadcast expects. Any suggestions? Thanks. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EOFException using KryoSerializer
I finally got back to this and I just wanted to let anyone that runs into this know that the problem is a kryo version issue. Spark (at least 1.4.0) depends on Kryo 2.21 while my client had 2.24.0 on the classpath. Changing it to 2.21 fixed the problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948p23479.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Non-classification neural networks
Hello all, We were using the old "Artificial Neural Network" : https://github.com/apache/spark/pull/1290 This code appears to have been incorporated in 1.5.2 but it's only exposed publicly via the MultilayerPerceptronClassifier. Is there a way to use the old feedforward/backprop non-classification functionality? It appears to be buried in private classes and it's not obvious to me if the MultilayerPerceptronClassifier can be used without the classification. The doc says "Number of outputs has to be equal to the total number of labels." What if the output is continuous and you want to simply do prediction? Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Non-classification-neural-networks-tp26604.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark on mesos in docker not getting parameters
I'm running spark 2.0.0 on Mesos using spark.mesos.executor.docker.image to point to a docker container that I built with the Spark installation. Everything is working except the Spark client process that's started inside the container doesn't get any of my parameters I set in the spark config in the driver. I set spark.executor.extraJavaOptions and spark.executor.extraClassPath in the driver and they don't get passed all the way through. Here is a capture of the chain of processes that are started on the mesos slave, in the docker container: root 1064 1051 0 12:46 ?00:00:00 docker -H unix:///var/run/docker.sock run --cpu-shares 8192 --memory 4723834880 -e SPARK_CLASSPATH=[path to my jar] -e SPARK_EXECUTOR_OPTS= -Daws.accessKeyId=[myid] -Daws.secretKey=[mykey] -e SPARK_USER=root -e SPARK_EXECUTOR_MEMORY=4096m -e MESOS_SANDBOX=/mnt/mesos/sandbox -e MESOS_CONTAINER_NAME=mesos-90e2c720-1e45-4dbc-8271-f0c47a33032a-S0.772f8080-6278-4a35-9e57-0009787ac605 -v /tmp/mesos/slaves/90e2c720-1e45-4dbc-8271-f0c47a33032a-S0/frameworks/f5794f8a-b56f-4958-b906-f05c426dcef0-0001/executors/0/runs/772f8080-6278-4a35-9e57-0009787ac605:/mnt/mesos/sandbox --net host --entrypoint /bin/sh --name mesos-90e2c720-1e45-4dbc-8271-f0c47a33032a-S0.772f8080-6278-4a35-9e57-0009787ac605 [my docker image] -c "/opt/spark/./bin/spark-class" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0 --hostname 192.168.10.145 --cores 8 --app-id f5794f8a-b56f-4958-b906-f05c426dcef0-0001 root 1193 1175 0 12:46 ?00:00:00 /bin/sh -c "/opt/spark/./bin/spark-class" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0 --hostname 192.168.10.145 --cores 8 --app-id f5794f8a-b56f-4958-b906-f05c426dcef0-0001 root 1208 1193 0 12:46 ?00:00:00 bash /opt/spark/./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0 --hostname 192.168.10.145 --cores 8 --app-id f5794f8a-b56f-4958-b906-f05c426dcef0-0001 root 1213 1208 0 12:46 ?00:00:00 bash /opt/spark/./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0 --hostname 192.168.10.145 --cores 8 --app-id f5794f8a-b56f-4958-b906-f05c426dcef0-0001 root 1215 1213 0 12:46 ?00:00:00 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx128m -cp /opt/spark/jars/* org.apache.spark.launcher.Main org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@192.168.10.145:46121 --executor-id 0 --hostname 192.168.10.145 --cores 8 --app-id f5794f8a-b56f-4958-b906-f05c426dcef0-0001 Notice, in the initial process started by mesos both the SPARK_CLASSPATH is set to the value of spark.executor.extraClassPath and the -D options are set as I set them on spark.executor.extraJavaOptions (in this case, to my aws creds) in the drive configuration. However, they are missing in subsequent child processes and the final java process started doesn't contain them either. I "fixed" the classpath problem by putting my jar in /opt/spark/jars (/opt/spark is the location I have spark installed in the docker container). Can someone tell me what I'm missing? Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-mesos-in-docker-not-getting-parameters-tp27500.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing files to s3 with out temporary directory
I have this exact issue. I was going to intercept the call in the filesystem if I had to (since we're using the S3 filesystem from Presto anyway) but if there's simply a way to do this correctly I'd much prefer it. This basically doubles the time to write parquet files to s3. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing files to s3 with out temporary directory
Thanks. In the meantime I might just write a custom file system that maps writes to parquet file parts to their final locations and then skips the move. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing files to s3 with out temporary directory
It's not actually that tough. We already use a custom Hadoop FileSystem for S3 because when we started using Spark with S3 the native FileSystem was very unreliable. Our's is based on the code from Presto. (see https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java ). I already have a version that introduces a hash to the filename for the file that's actually written to the S3 to see if it makes a difference per https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html#get-workload-considerations . FWIW, it doesn't. I'm going to modify that experiment to override the key name like before except actually mode the file, keep track of the state, and override the rename method. The problems with this approach are: 1) it's brittle because it depends on the internal directory and file naming conventions in Hadoop and Parquet. 2) It will assume (as seems to be currently the case) that the 'rename' call is done for all files from the driver. But it should do until there's a better solution in the Hadoop committer. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing files to s3 with out temporary directory
I got it working. It's much faster. If someone else wants to try it I: 1) Was already using the code from the Presto S3 Hadoop FileSystem implementation modified to sever it from the rest of the Presto codebase. 2) I extended it and overrode the method "keyFromPath" so that anytime the Path referred to a "_temporary" parquet file "part" it returned a "key" to the final location of the file. 3) I registered the filesystem through sparkContext.hadoopConfiguration by setting fs.s3.impl, fs.s3n.impl, and fs.s3a.impl. I realize I'm risking a file corruption but it's WY faster than it was. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Parquet divide by zero
Hello all, I've been hitting a divide by zero error in Parquet though Spark detailed (and fixed) here: https://github.com/apache/incubator-parquet-mr/pull/102 Is anyone else hitting this error? I hit it frequently. It looks like the Parquet team is preparing to release 1.6.0 and, since they have been completely unresponsive, I'm assuming its going to go with this bug (without the fix). Other than the fact that the divide by zero mistake is obvious, perhaps the conditions it occurs are rare and I'm doing something wrong. Has anyone else hit this and if so, have they resolved it? Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-divide-by-zero-tp21406.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
1.3 Hadoop File System problem
I have code that works under 1.2.1 but when I upgraded to 1.3.0 it fails to find the s3 hadoop file system. I get the "java.lang.IllegalArgumentException: Wrong FS: s3://path to my file], expected: file:///" when I try to save a parquet file. This worked in 1.2.1. Has anyone else seen this? I'm running spark using "local[8]" so it's all internal. These are actually unit tests in our app that are failing now. Thanks. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/1-3-Hadoop-File-System-problem-tp22207.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 1.3 Hadoop File System problem
Thanks Patrick and Michael for your responses. For anyone else that runs across this problem prior to 1.3.1 being released, I've been pointed to this Jira ticket that's scheduled for 1.3.1: https://issues.apache.org/jira/browse/SPARK-6351 Thanks again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/1-3-Hadoop-File-System-problem-tp22207p5.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using Spark to add data to an existing Parquet file without a schema
Hello all, I've been trying to figure out how to add data to an existing Parquet file without having a schema. Spark has allowed me to load JSON and save it as a Parquet file but I was wondering if anyone knows how to ADD/INSERT more data. I tried using sql insert and that doesn't work. All of the examples assume a schema exists in the form of a serialization IDL and generated classes. I looked into the code and considered direct use of InsertIntoParquetTable or a copy of it but I was hoping someone already solved the problem. Any guidance would be greatly appreciated. Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-add-data-to-an-existing-Parquet-file-without-a-schema-tp13450.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Spark to add data to an existing Parquet file without a schema
Okay, Obviously I don't care about adding more files to the system so is there a way to point to an existing parquet file (directory) and seed the individual "part-r-***.parquet" (the value of "partition + offset") while preventing I mean, I can hack it by copying files into the same parquet directory and managing the file names externally but this seems like a work around. Is that the way others are doing it? Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Spark-to-add-data-to-an-existing-Parquet-file-without-a-schema-tp13450p13499.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Querying a parquet file in s3 with an ec2 install
Hello all, I've been wrestling with this problem all day and any suggestions would be greatly appreciated. I'm trying to test reading a parquet file that's stored in s3 using a spark cluster deployed on ec2. The following works in the spark shell when run completely locally on my own machine (i.e. no --master option passed to the spark-shell command): val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val p = parquetFile("s3n://[bucket]/path-to-parquet-dir/") p.registerAsTable("s") sql("select count(*) from s").collect I have an ec2 deployment of spark (tried version 1.0.2 and 1.1.0-rc4) using the standalone cluster manager and deployed with the spark-ec2 script. Running the same code in a spark shell connected to the cluster it basically hangs on the select statement. The workers/slaves simply time out and restart every 30 seconds when they hit what appears to be an activity timeout, as if there's no activity from the spark-shell (based on what I see in the stderr logs for the job, I assume this is expected behavior when connected from a spark-shell that's sitting idle). I see these messages about every 30 seconds: 14/09/08 17:43:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/7 is now EXITED (Command exited with code 1) 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/7 removed: Command exited with code 1 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor added: app-20140908213842-0002/8 on worker-20140908183422-ip-10-60-107-194.ec2.internal-53445 (ip-10-60-107-194.ec2.internal:53445) with 2 cores 14/09/08 17:43:09 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140908213842-0002/8 on hostPort ip-10-60-107-194.ec2.internal:53445 with 2 cores, 4.0 GB RAM 14/09/08 17:43:09 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/8 is now RUNNING Eventually it fails with a: 14/09/08 17:44:16 INFO AppClient$ClientActor: Executor updated: app-20140908213842-0002/9 is now EXITED (Command exited with code 1) 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Executor app-20140908213842-0002/9 removed: Command exited with code 1 14/09/08 17:44:16 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 14/09/08 17:44:16 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/09/08 17:44:16 INFO TaskSchedulerImpl: Cancelling stage 1 14/09/08 17:44:16 INFO DAGScheduler: Failed to run collect at SparkPlan.scala:85 14/09/08 17:44:16 INFO SparkUI: Stopped Spark web UI at http://192.168.10.198:4040 14/09/08 17:44:16 INFO DAGScheduler: Stopping DAGScheduler 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Shutting down all executors 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 14/09/08 17:44:16 INFO SparkDeploySchedulerBackend: Asking each executor to shut down org.apache.spark.SparkException: Job aborted due to stage failure: Master removed our application: FAILED at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWo
Re: Querying a parquet file in s3 with an ec2 install
My apologies to the list. I replied to Manu's question and it went directly to him rather than the list. In case anyone else has this issue here is my reply and Manu's reply to me. This also answers Ian's question. --- Hi Manu, The dataset is 7.5 million rows and 500 columns. In parquet form it's about 1.1 Gig. It was created with Spark and copied up to s3. It has about 4600 parts (which I'd also like to gain some control over). I can try a smaller dataset, however it works when I run it locally, even with the file out on s3. It just takes a while. I can try copying it to HDFS first but that wont help longer term. Thanks Jim - Manu's response: - I am pretty sure it is due to the number of parts you have.. I have a parquet data set that is 250M rows and 924 columns and it is ~2500 files... I recommend creating a tables in HIve with that data set and doing an insert overwrite so you can get a data set with more manageable files.. Why I think its the number of files is that I believe that a all of those or large part of those files are read when you run sqlContext.parquetFile() and the time it would take in s3 for that to happen is a lot so something internally is timing out.. -Manu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Querying a parquet file in s3 with an ec2 install
>Why I think its the number of files is that I believe that a > all of those or large part of those files are read when >you run sqlContext.parquetFile() and the time it would >take in s3 for that to happen is a lot so something >internally is timing out.. I'll create the parquet files with Drill instead of Spark which will give me (somewhat) better control over the slice sizes and see what happens. That said, this behavior seems wrong to me. First, exiting due to inactivity on a job seems like (perhaps?) the wrong fix to a former problem. Second, there IS activity if it's reading the slice headers but the job is exiting anyway. So if this fixes the problem the measure of "activity" seems wrong. Ian and Manu, thanks for your help. I'll post back and let you know if that fixes it. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13791.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Querying a parquet file in s3 with an ec2 install
Okay, This seems to be either a code version issue or a communication issue. It works if I execute the spark shell from the master node. It doesn't work if I run it from my laptop and connect to the master node. I had opened the ports for the WebUI (8080) and the cluster manager (7077) for the master node or it fails much sooner. Do I need to open up the ports for the workers as well? I used the spark-ec2 install script with --spark-version using both 1.0.2 and then again with the git hash tag that corresponds to 1.1.0rc4 (2f9b2bd7844ee8393dc9c319f4fefedf95f5e460). In both cases I rebuilt from source using the same codebase on my machine and moved the entire project into /root/spark (since to run the spark-shell it needs to match the same path as the install on ec2). Could I have missed something here? Thanks. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13802.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Network requirements between Driver, Master, and Slave
Hello all, I'm trying to run a Driver on my local network with a deployment on EC2 and it's not working. I was wondering if either the master or slave instances (in standalone) connect back to the driver program. I outlined the details of my observations in a previous post but here is what I'm seeing: I have v1.1.0 installed (the new tag) on ec2 using the spark-ec2 script. I have the same version of the code built locally. I edited the master security group to allow inbound access from anywhere to 7077 and 8080. I see a connection take place. I see the workers fail with a timeout when any job is run. The master eventually removes the driver's job. I supposed this makes sense if there's a requirement for either the worker or the master to be on the same network as the driver. Is that the case? Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Network requirements between Driver, Master, and Slave
Hi Akhil, Thanks! I guess in short that means the master (or slaves?) connect back to the driver. This seems like a really odd way to work given the driver needs to already connect to the master on port 7077. I would have thought that if the driver could initiate a connection to the master, that would be all that's required. Can you describe what it is about the architecture that requires the master to connect back to the driver even when the driver initiates a connection to the master? Just curious. Thanks anyway. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Network-requirements-between-Driver-Master-and-Slave-tp13997p14086.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Ending a job early
We have some very large datasets where the calculation converge on a result. Our current implementation allows us to track how quickly the calculations are converging and end the processing early. This can significantly speed up some of our processing. Is there a way to do the same thing is spark? A trivial example might be a column average on a dataset. As we're 'aggregating' rows into columnar averages I can track how fast these averages are moving and decide to stop after a low percentage of the rows has been processed, producing an estimate rather than an exact value. Within a partition, or better yet, within a worker across 'reduce' steps, is there a way to stop all of the aggregations and just continue on with reduces of already processed data? Thanks JIm -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ending-a-job-early-tp17505.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Wildly varying "aggregate" performance depending on code location
Hello all, I have a really strange thing going on. I have a test data set with 500K lines in a gzipped csv file. I have an array of "column processors," one for each column in the dataset. A Processor tracks aggregate state and has a method "process(v : String)" I'm calling: val processors: Array[Processors] = sc.textFile(gzippedFileName).aggregate(processors, { (curState, row) => row.split(",", -1).zipWithIndex.foreach({ v => curState(v._2).process(v._1) }) curState } ) If the class definition for the Processors is in the same file as the driver it runs in ~23 seconds. If I move the classes to a separate file in the same package without ANY OTHER CHANGES it goes to ~35 seconds. This doesn't make any sense to me. I can't even understand how the compiled class files could be any different in either case. Does anyone have an explanation for why this might be? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Wildly-varying-aggregate-performance-depending-on-code-location-tp18752.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Wildly varying "aggregate" performance depending on code location
Well it looks like this is a scala problem after all. I loaded the file using pure scala and ran the exact same Processors without Spark and I got 20 seconds (with the code in the same file as the 'main') vs 30 seconds (with the exact same code in a different file) on the 500K rows. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Wildly-varying-aggregate-performance-depending-on-code-location-tp18752p18772.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Set worker log configuration when running "local[n]"
How do I set the log level when running "local[n]"? It ignores the log4j.properties file on my classpath. I also tried to set the spark home dir on the SparkConfig using setSparkHome and made sure an appropriate log4j.properties file was in a "conf" subdirectory and that didn't work either. I'm running with the current master. Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set worker log configuration when running "local[n]"
Actually, it looks like it's Parquet logging that I don't have control over. For some reason the parquet project decided to use java.util logging with its own logging configuration. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948p18951.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How do I turn off Parquet logging in a worker?
I'm running a local spark master ("local[n]"). I cannot seem to turn off the parquet logging. I tried: 1) Setting a log4j.properties on the classpath. 2) Setting a log4j.properties file in a spark install conf directory and pointing to the install using setSparkHome 3) Editing the log4j-default.properties file in the spark-core jar that I'm using 4) Changing the JAVA_HOME/jre/lib/logging.properties (since Parquet uses java.util.logging) 5) adding the following code as the first lines in my main: java.util.logging.Logger.getLogger("parquet").addHandler( new java.util.logging.Handler() { def close(): Unit = {} def flush(): Unit = {} def publish(x: java.util.logging.LogRecord): Unit = {} }) NOTHING seems to change the default log level console output in parquet when it runs in a worker. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do I turn off Parquet logging in a worker?
This is a problem because (other than the fact that Parquet uses java.util.logging) of a bug in Spark in the current master. ParquetRelation.scala attempts to override the parquet logger but, at least currently (and if your application simply reads a parquet file before it does anything else with Parquet), the parquet.Log class hasn't been loaded yet. Therefore the code in ParquetRelation.enableLogForwarding has no affect. If you look at the code in parquet.Log there's a static initializer that needs to be called prior to enableLogForwarding or whatever enableLogForwarding does gets undone by this static initializer. Adding: Class.forName("parquet.Log") as the first thing in my main fixed the problem. The "fix" would be to force the static initializer to get called in parquet.Log as part of enableForwardLogging. Anyone want a PR? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955p18958.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set worker log configuration when running "local[n]"
Just to be complete, this is a problem in Spark that I worked around and detailed here: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-td18955.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-worker-log-configuration-when-running-local-n-tp18948p18959.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do I turn off Parquet logging in a worker?
Jira: https://issues.apache.org/jira/browse/SPARK-4412 PR: https://github.com/apache/spark/pull/3271 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-turn-off-Parquet-logging-in-a-worker-tp18955p18977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Standard SQL tool access to SchemaRDD
Hello all, Is there a way to load an RDD in a small driver app and connect with a JDBC client and issue SQL queries against it? It seems the thrift server only works with pre-existing Hive tables. Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Standard SQL tool access to SchemaRDD
Thanks! I'll give it a try. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197p20202.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
No disk single pass RDD aggregation
Okay, I have an rdd that I want to run an aggregate over but it insists on spilling to disk even though I structured the processing to only require a single pass. In other words, I can do all of my processing one entry in the rdd at a time without persisting anything. I set rdd.persist(StorageLevel.NONE) and it had no affect. When I run locally I get my /tmp directory filled with transient rdd data even though I never need the data again after the row's been processed. Is there a way to turn this off? Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No disk single pass RDD aggregation
In case a little more information is helpful: the RDD is constructed using sc.textFile(fileUri) where the fileUri is to a ".gz" file (that's too big to fit on my disk). I do an rdd.persist(StorageLevel.NONE) and it seems to have no affect. This rdd is what I'm calling aggregate on and I expect to only use it once. Each row in the rdd never has to be revisited. The aggregate seqOp is modifying a "current state" and returning it so there's no need to store the results of the seqOp on a row-by-row basis, and give the fact that there's one partition the comboOp doesn't even need to be called (since there would be nothing to combine across partitions). Thanks for any help. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20724.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No disk single pass RDD aggregation
Nvm. I'm going to post another question since this has to do with the way spark handles sc.textFile with a file://.gz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20725.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark handling of a file://xxxx.gz Uri
Is there a way to get Spark to NOT reparition/shuffle/expand a sc.textFile(fileUri) when the URI is a gzipped file? Expanding a gzipped file should be thought of as a "transformation" and not an "action" (if the analogy is apt). There is no need to fully create and fill out an intermediate RDD with the expanded data when it can be done one row at a time. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How do I stop the automatic partitioning of my RDD?
I've been trying to figure out how to use Spark to do a simple aggregation without reparitioning and essentially creating fully instantiated intermediate RDDs and it seem virtually impossible. I've now gone as far as writing my own single parition RDD that wraps an Iterator[String] and calling aggregate() on it. Before any of my aggregation code executes the entire Iterator is unwound and multiple partitions are created to be given to my aggregation. The Task execution call stack includes: ShuffleMap.runTask SortShuffleWriter.write ExternalSorter.insertAll ... which is iterating over my entire RDD and repartitioning it an SpillFile collecting it. How do I prevent this from happening? There's no need to do this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do I stop the automatic partitioning of my RDD?
Wow. i just realized what was happening and it's all my fault. I have a library method that I wrote that presents the RDD and I was actually repartitioning it myself. I feel pretty dumb. Sorry about that. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732p20735.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No disk single pass RDD aggregation
Hi, This was all my fault. It turned out I had a line of code buried in a library that did a "repartition." I used this library to wrap an RDD to present it to legacy code as a different interface. That's what was causing the data to spill to disk. The really stupid thing is it took me the better part of a day to find and several misguided emails to this list (including the one that started this thread). Sorry about that. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20763.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cluster Aware Custom RDD
Hello all, I have a custom RDD for fast loading of data from a non-partitioned source. The partitioning happens in the RDD implementation by pushing data from the source into queues picked up by the current active partitions in worker threads. This works great on a multi-threaded single host (say with the manager set to "local[x]" ) but I'd like to run it distributed. However, I need to know, not only which "slice" my partition is, but also which host (by sequence) it's on so I can divide up the source by worker (host) and then run the multi-threaded. In other words, I need what effectively amounts to a 2-tier slice identifier. I know this is probably unorthodox, but is there some way to get this information in the compute method or the deserialized Partition objects? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cluster-Aware-Custom-RDD-tp21196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Continuously running non-streaming jobs
Is there a way to create continuously-running, or at least continuously-loaded, jobs that can be 'invoked' rather than 'sent' to to avoid the job creation overhead of a couple seconds? I read through the following: http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html Thanks. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Continuously running non-streaming jobs
Daniel, I'm new to Spark but I thought that thread hinted at the right answer. Thanks, Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391p4397.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
stdout in workers
I'm experimenting with a few things trying to understand how it's working. I took the JavaSparkPi example as a starting point and added a few System.out lines. I added a system.out to the main body of the driver program (not inside of any Functions). I added another to the mapper. I added another to the reducer. I set up a simple "standalone" distribution with a 1 master (no zookeeper) and a 1 worker. The main println and the reducer println print out from the driver program. The mapper one doesn't print in the shell I started the worker in, nor does it appear in any logs (in case stdout was redirected). However, the program executes (and the mapper executes on the worker) since I can see the job run in the logs and I get an answer. Where should I be looking? Thanks. Apologies if this is a dumb question - I searched for answer but only found a reference to another list posting where the user thought his driver print statement should have printed on the worker. Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stdout-in-workers-tp4537.html Sent from the Apache Spark User List mailing list archive at Nabble.com.