Re: Core allocation is scattered
Hello, Below is my understanding. The default configuration parameters which will be considered by the spark job if these are not configured at the time of submitting job to the required values. # - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2) # - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1). # - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G) SPARK_EXECUTOR_INSTANCES -> indicates the number of workers to be started, it means for a job maximum this many number of executors it can ask/take from the cluster resource manager. SPARK_EXECUTOR_CORES -> indicates the number of cores in each executor, it means the spark TaskScheduler will ask this many cores to be allocated/blocked in each of the executor machine. SPARK_EXECUTOR_MEMORY -> indicates the maximum amount of RAM/MEMORY it requires in each executor. All these details are asked by the TastScheduler to the cluster manager (it may be a spark standalone, yarn, mesos and can be kubernetes supported starting from spark 2.0) to provide before actually the job execution starts. Also, please note that, initial number of executor instances is dependent on "--num-executors" but when the data is more to be processed and "spark.dynamicAllocation.enabled" set true, then it will be dynamically add more executors based on "spark.dynamicAllocation.initialExecutors". Note: Always "spark.dynamicAllocation.initialExecutors" should be configured greater than "--num-executors". spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors Initial number of executors to run if dynamic allocation is enabled. If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will be used as the initial number of executors. spark.executor.memory 1g Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). spark.executor.cores 1 in YARN mode, all the available cores on the worker in standalone and Mesos coarse-grained modes. The number of cores to use on each executor. In standalone and Mesos coarse-grained modes, for more detail, see this description <http://spark.apache.org/docs/latest/spark-standalone.html#Executors%20Scheduling> . On Thu, Jul 25, 2019 at 5:54 PM Amit Sharma wrote: > I have cluster with 26 nodes having 16 cores on each. I am running a spark > job with 20 cores but i did not understand why my application get 1-2 cores > on couple of machines why not it just run on two nodes like node1=16 cores > and node 2=4 cores . but cores are allocated like node1=2 node > =1-node 14=1 like that. Is there any conf property i need to > change. I know with dynamic allocation we can use below but without dynamic > allocation is there any? > --conf "spark.dynamicAllocation.maxExecutors=2" > > > Thanks > Amit > -- Regards, Srikanth Sriram
Task failure to read input files
Hello, I'm running Spark job on AWS EMR that reads many lzo files from a S3 bucket partitioned by date. Sometimes I see errors in logs similar to 18/04/13 11:53:52 WARN TaskSetManager: Lost task 151177.0 in stage 43.0 (TID 1516123, ip-10-10-2-6.ec2.internal, executor 57): java.io.IOException: Corrupted uncompressed block at com.hadoop.compression.lzo.LzopInputStream.verifyChecksums(LzopInputStream.java:219) at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:284) at com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261) I don't see the jobs fail. I assume this task succeeded when it is retried. If the input file is actually corrupted even task retries should fail and eventually job will fail based on "spark.task.maxFailures" config rt? Is there way to make Spark/Hadoop lzo library to print the full file name when such failures happen? So that I can then manually check if the file is indeed corrupted. Thanks, Srikanth
Spark ML DAG Pipelines
Hi Spark Experts, Can someone point me to some examples for non-linear (DAG) ML pipelines. That would be of great help. Thanks much in advance -Srikanth
Spark streaming app leaking memory?
Hi, I have a Spark streaming(Spark 2.1.0) app where I see these logs in executor. Does this point to some memory leak? 17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size = 67108864 bytes, TID = 7752 17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size = 67108864 bytes, TID = 7740 17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size = 67108864 bytes, TID = 7764 17/05/16 15:11:13 WARN Executor: Managed memory leak detected; size = 67108864 bytes, TID = 7728 17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7783: [rdd_1_15] 17/05/16 15:12:02 WARN Executor: 1 block locks were not released by TID = 7807: [rdd_1_39] I notice that "Managed memory leak" logs are not seen when I use G1GC. Srikanth
Re: Spark streaming + kafka error with json library
Thanks for the tip. That worked. When would one use the assembly? On Wed, Mar 29, 2017 at 7:13 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly) > > On Wed, Mar 29, 2017 at 9:59 AM, Srikanth <srikanth...@gmail.com> wrote: > >> Hello, >> >> I'm trying to use "org.json4s" % "json4s-native" library in a spark >> streaming + kafka direct app. >> When I use the latest version of the lib I get an error similar to this >> <https://github.com/json4s/json4s/issues/316> >> The work around suggest there is to use version 3.2.10. As spark has a >> hard dependency on this version. >> >> I forced this version in SBT with >> dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10" >> >> But now it seems to have some conflict with spark-streaming-kafka-0-10-ass >> embly >> >> [error] (*:assembly) deduplicate: different file contents found in the >> following: >> >> [error] C:\Users\stati\.ivy2\cache\org.apache.spark\spark-streaming- >> kafka-0-10-assembly_2.11\jars\spark-streaming-kafka-0-10- >> assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/Implic >> itConversions$$anonfun$flatten2$1.class >> >> [error] C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-pars >> er-combinators_2.11\bundles\scala-parser-combinators_2.11- >> 1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions >> $$anonfun$flatten2$1.class >> >> DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling >> json4s-native. >> Any idea how to resolve this? I'm using spark version 2.1.0 >> >> Thanks, >> Srikanth >> > >
Spark streaming + kafka error with json library
Hello, I'm trying to use "org.json4s" % "json4s-native" library in a spark streaming + kafka direct app. When I use the latest version of the lib I get an error similar to this <https://github.com/json4s/json4s/issues/316> The work around suggest there is to use version 3.2.10. As spark has a hard dependency on this version. I forced this version in SBT with dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10" But now it seems to have some conflict with spark-streaming-kafka-0-10-assembly [error] (*:assembly) deduplicate: different file contents found in the following: [error] C:\Users\stati\.ivy2\cache\org.apache.spark\spark-streaming-kafka-0-10-assembly_2.11\jars\spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class [error] C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling json4s-native. Any idea how to resolve this? I'm using spark version 2.1.0 Thanks, Srikanth
Re: Exception in spark streaming + kafka direct app
This is running in YARN cluster mode. It was restarted automatically and continued fine. I was trying to see what went wrong. AFAIK there were no task failure. Nothing in executor logs. The log I gave is in driver. After some digging, I did see that there was a rebalance in kafka logs around this time. So will driver fail and exit in such cases? I've seen drivers exit after a job has hit max retry attempts. This is different though rt? Srikanth On Tue, Feb 7, 2017 at 5:25 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Does restarting after a few minutes solves the problem? Could be a > transient issue that lasts long enough for spark task-level retries to all > fail. > > On Tue, Feb 7, 2017 at 4:34 PM, Srikanth <srikanth...@gmail.com> wrote: > >> Hello, >> >> I had a spark streaming app that reads from kafka running for a few hours >> after which it failed with error >> >> *17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time >> 148649785 ms >> java.lang.IllegalStateException: No current assignment for partition >> mt_event-5 >> at >> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) >> at >> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) >> at >> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179) >> at >> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)* >> >> >> >> >> 17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception: >> java.lang.IllegalStateException: No current assignment for partition >> mt_event-5 >> java.lang.IllegalStateException: No current assignment for partition >> mt_event-5 >> >> >> >> >> 17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job >> generator (timeout = 5) >> >> >> Driver did not recover from this error and failed. The previous batch ran >> 5sec back. There are no indications in the logs that some rebalance happened. >> As per kafka admin, kafka cluster health was good when this happened and no >> maintenance was being done. >> >> Any idea what could have gone wrong and why this is a fatal error? >> >> Regards, >> Srikanth >> >> >
Exception in spark streaming + kafka direct app
Hello, I had a spark streaming app that reads from kafka running for a few hours after which it failed with error *17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time 148649785 ms java.lang.IllegalStateException: No current assignment for partition mt_event-5 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)* 17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalStateException: No current assignment for partition mt_event-5 java.lang.IllegalStateException: No current assignment for partition mt_event-5 17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job generator (timeout = 5) Driver did not recover from this error and failed. The previous batch ran 5sec back. There are no indications in the logs that some rebalance happened. As per kafka admin, kafka cluster health was good when this happened and no maintenance was being done. Any idea what could have gone wrong and why this is a fatal error? Regards, Srikanth
Re: Spark 2.0 with Kafka 0.10 exception
Kakfa 0.10.1 release separates poll() from heartbeat. So session.timeout.ms & max.poll.interval.ms can be set differently. I'll leave it to you on how to add this to docs! On Thu, Oct 20, 2016 at 1:41 PM, Cody Koeninger <c...@koeninger.org> wrote: > Right on, I put in a PR to make a note of that in the docs. > > On Thu, Oct 20, 2016 at 12:13 PM, Srikanth <srikanth...@gmail.com> wrote: > > Yeah, setting those params helped. > > > > On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> 60 seconds for a batch is above the default settings in kafka related > >> to heartbeat timeouts, so that might be related. Have you tried > >> tweaking session.timeout.ms, heartbeat.interval.ms, or related > >> configs? > >> > >> On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com> > wrote: > >> > Bringing this thread back as I'm seeing this exception on a production > >> > kafka > >> > cluster. > >> > > >> > I have two Spark streaming apps reading the same topic. App1 has batch > >> > interval 2secs and app2 has 60secs. > >> > Both apps are running on the same cluster on similar hardware. I see > >> > this > >> > exception only in app2 and fairly consistently. > >> > > >> > Difference I see between the apps is > >> > App1 > >> > spark.streaming.kafka.maxRatePerPartition, 6000 > >> > batch interval 2 secs > >> > App2 > >> > spark.streaming.kafka.maxRatePerPartition, 1 > >> > batch interval 60 secs > >> > > >> > All other kafka/spark related configs are same for both apps. > >> > spark.streaming.kafka.consumer.poll.ms = 4096 > >> > spark.streaming.backpressure.enabled = true > >> > > >> > Not sure if pre-fetching or caching is messing things up. > >> > > >> > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 > >> > (TID > >> > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: > >> > assertion > >> > failed: Failed to get records for spark-executor- > StreamingEventSplitProd > >> > mt_event 6 49091480 after polling for 4096 > >> > at scala.Predef$.assert(Predef.scala:170) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala: > 409) > >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala: > 409) > >> > at scala.collection.Iterator$$anon$21.next(Iterator.scala: > 838) > >> > > >> > > >> > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> > >> > wrote: > >> >> > >> >> That's not what I would have expected to happen with a lower cache > >> >> setting, but in general disabling the cache isn't something you want > >> >> to do with the new kafka consumer. > >> >> > >> >> > >> >> As far as the original issue, are you seeing those polling errors > >> >> intermittently, or consistently? From your description, it sounds > >> >> like retry is working correctly. > >> >> > >> >> > >> >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> > wrote: > >> >> > Setting those two results in below exception. > >> >> > No.of executors < no.of partitions. Could that be triggering this? > >> >> > > >> >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage > 2.0 > >> >> > (TID 9) > >> >> > java.util.ConcurrentModificationException: KafkaConsumer is not > safe > >> >> > for > >> >> > multi-threaded access > >> >> > at > >> >> > > >> >> > > >> >> > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > >> >> >
Re: Spark 2.0 with Kafka 0.10 exception
Yeah, setting those params helped. On Wed, Oct 19, 2016 at 1:32 PM, Cody Koeninger <c...@koeninger.org> wrote: > 60 seconds for a batch is above the default settings in kafka related > to heartbeat timeouts, so that might be related. Have you tried > tweaking session.timeout.ms, heartbeat.interval.ms, or related > configs? > > On Wed, Oct 19, 2016 at 12:22 PM, Srikanth <srikanth...@gmail.com> wrote: > > Bringing this thread back as I'm seeing this exception on a production > kafka > > cluster. > > > > I have two Spark streaming apps reading the same topic. App1 has batch > > interval 2secs and app2 has 60secs. > > Both apps are running on the same cluster on similar hardware. I see this > > exception only in app2 and fairly consistently. > > > > Difference I see between the apps is > > App1 > > spark.streaming.kafka.maxRatePerPartition, 6000 > > batch interval 2 secs > > App2 > > spark.streaming.kafka.maxRatePerPartition, 1 > > batch interval 60 secs > > > > All other kafka/spark related configs are same for both apps. > > spark.streaming.kafka.consumer.poll.ms = 4096 > > spark.streaming.backpressure.enabled = true > > > > Not sure if pre-fetching or caching is messing things up. > > > > 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID > > 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: > assertion > > failed: Failed to get records for spark-executor-StreamingEventSplitProd > > mt_event 6 49091480 after polling for 4096 > > at scala.Predef$.assert(Predef.scala:170) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$21.next(Iterator.scala:838) > > > > > > On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> That's not what I would have expected to happen with a lower cache > >> setting, but in general disabling the cache isn't something you want > >> to do with the new kafka consumer. > >> > >> > >> As far as the original issue, are you seeing those polling errors > >> intermittently, or consistently? From your description, it sounds > >> like retry is working correctly. > >> > >> > >> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote: > >> > Setting those two results in below exception. > >> > No.of executors < no.of partitions. Could that be triggering this? > >> > > >> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 > >> > (TID 9) > >> > java.util.ConcurrentModificationException: KafkaConsumer is not safe > for > >> > multi-threaded access > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > >> > at > >> > > >> > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > >> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > >> > at java.util.HashMap.putVal(Unknown Source) > >> > at java.util.HashMap.put(Unknown Source) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > >> > at > >> > > >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.( > KafkaRDD.scala:210) > >> > at > >> > org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > >> > at > >> > org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > &g
Re: Spark 2.0 with Kafka 0.10 exception
Bringing this thread back as I'm seeing this exception on a production kafka cluster. I have two Spark streaming apps reading the same topic. App1 has batch interval 2secs and app2 has 60secs. Both apps are running on the same cluster on similar hardware. I see this exception only in app2 and fairly consistently. Difference I see between the apps is App1 spark.streaming.kafka.maxRatePerPartition, 6000 batch interval 2 secs App2 spark.streaming.kafka.maxRatePerPartition, 1 batch interval 60 secs All other kafka/spark related configs are same for both apps. spark.streaming.kafka.consumer.poll.ms = 4096 spark.streaming.backpressure.enabled = true Not sure if pre-fetching or caching is messing things up. 16/10/19 14:32:04 WARN TaskSetManager: Lost task 2.0 in stage 1780.0 (TID 12541, ip-10-150-20-200.ec2.internal): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-StreamingEventSplitProd mt_event 6 49091480 after polling for 4096 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer. get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$ KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$ KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$21.next(Iterator.scala:838) On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> wrote: > That's not what I would have expected to happen with a lower cache > setting, but in general disabling the cache isn't something you want > to do with the new kafka consumer. > > > As far as the original issue, are you seeing those polling errors > intermittently, or consistently? From your description, it sounds > like retry is working correctly. > > > On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote: > > Setting those two results in below exception. > > No.of executors < no.of partitions. Could that be triggering this? > > > > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 > (TID 9) > > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > > multi-threaded access > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > > at java.util.HashMap.putVal(Unknown Source) > > at java.util.HashMap.put(Unknown Source) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.( > KafkaRDD.scala:210) > > at org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > > at org.apache.spark.scheduler.Task.run(Task.scala:85) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > at java.lang.Thread.run(Unknown Source) > > > > > > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> you could try setting > >> > >> spark.streaming.kafka.consumer.cache.initialCapac
Re: Spark with S3 DirectOutputCommitter
Thanks Steve! We are already using HDFS as an intermediate store. This is for the last stage of processing which has to put data in S3. The output is partitioned by 3 fields, like .../field1=111/field2=999/date=-MM-DD/* Given that there are 100s for folders and 1000s of subfolder and part files, rename from _temporary is just not practical in S3. I guess we have to add another stage with S3Distcp?? Srikanth On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran <ste...@hortonworks.com> wrote: > > > On 9 Sep 2016, at 21:54, Srikanth <srikanth...@gmail.com> wrote: > > > > Hello, > > > > I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried > a few configs and none of them seem to work. > > Output always creates _temporary directory. Rename is killing > performance. > > > I read some notes about DirectOutputcommitter causing problems with > speculation turned on. Was this option removed entirely? > > Spark turns off any committer with the word "direct' in its name if > speculation==true . Concurrency, see. > > even on on-speculative execution, the trouble with the direct options is > that executor/job failures can leave incomplete/inconsistent work around > —and the things downstream wouldn't even notice > > There's work underway to address things, work which requires a consistent > metadata store alongside S3 ( HADOOP-13345 : S3Guard). > > For now: stay with the file output committer > > hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 > hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true > > Even better: use HDFS as the intermediate store for work, only do a bulk > upload at the end. > > > > > val spark = SparkSession.builder() > > .appName("MergeEntities") > > .config("spark.sql.warehouse.dir", > mergeConfig.getString("sparkSqlWarehouseDir")) > > .config("fs.s3a.buffer.dir", "/tmp") > > .config("spark.hadoop.mapred.output.committer.class", > classOf[DirectOutputCommitter].getCanonicalName) > > .config("mapred.output.committer.class", > classOf[DirectOutputCommitter].getCanonicalName) > > .config("mapreduce.use.directfileoutputcommitter", > "true") > > //.config("spark.sql.sources.outputCommitterClass", > classOf[DirectOutputCommitter].getCanonicalName) > > .getOrCreate() > > > > Srikanth > >
Spark with S3 DirectOutputCommitter
Hello, I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a few configs and none of them seem to work. Output always creates _temporary directory. Rename is killing performance. I read some notes about DirectOutputcommitter causing problems with speculation turned on. Was this option removed entirely? val spark = SparkSession.builder() .appName("MergeEntities") .config("spark.sql.warehouse.dir", mergeConfig.getString(" sparkSqlWarehouseDir")) .config("fs.s3a.buffer.dir", "/tmp") .config("spark.hadoop.mapred.output.committer.class", classOf[DirectOutputCommitter].getCanonicalName) .config("mapred.output.committer.class", classOf[DirectOutputCommitter].getCanonicalName) .config("mapreduce.use.directfileoutputcommitter", "true") //.config("spark.sql.sources.outputCommitterClass", classOf[DirectOutputCommitter].getCanonicalName) .getOrCreate() Srikanth
"Job duration" and "Processing time" don't match
Hello, I was looking at Spark streaming UI and noticed a big difference between "Processing time" and "Job duration" [image: Inline image 1] Processing time/Output Op duration is show as 50s but sum of all job duration is ~25s. What is causing this difference? Based on logs I know that the batch actually took 50s. [image: Inline image 2] The job that is taking most of time is joinRDD.toDS() .write.format("com.databricks.spark.csv") .mode(SaveMode.Append) .options(Map("mode" -> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "false")) .partitionBy("entityId", "regionId", "eventDate") .save(outputPath) Removing SaveMode.Append really speeds things up and also the mismatch between Job duration and processing time disappears. I'm not able to explain what is causing this though. Srikanth
Re: Spark 2.0 with Kafka 0.10 exception
Yea, disabling cache was not going to be my permanent solution either. I was going to ask how big an overhead is that? It happens intermittently and each time it happens retry is successful. Srikanth On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger <c...@koeninger.org> wrote: > That's not what I would have expected to happen with a lower cache > setting, but in general disabling the cache isn't something you want > to do with the new kafka consumer. > > > As far as the original issue, are you seeing those polling errors > intermittently, or consistently? From your description, it sounds > like retry is working correctly. > > > On Wed, Sep 7, 2016 at 2:37 PM, Srikanth <srikanth...@gmail.com> wrote: > > Setting those two results in below exception. > > No.of executors < no.of partitions. Could that be triggering this? > > > > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 > (TID 9) > > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > > multi-threaded access > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1430) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close( > KafkaConsumer.java:1360) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ > anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) > > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) > > at java.util.HashMap.putVal(Unknown Source) > > at java.util.HashMap.put(Unknown Source) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$. > get(CachedKafkaConsumer.scala:158) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.( > KafkaRDD.scala:210) > > at org.apache.spark.streaming.kafka010.KafkaRDD.compute( > KafkaRDD.scala:185) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:79) > > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:47) > > at org.apache.spark.scheduler.Task.run(Task.scala:85) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > at java.lang.Thread.run(Unknown Source) > > > > > > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> you could try setting > >> > >> spark.streaming.kafka.consumer.cache.initialCapacity > >> > >> spark.streaming.kafka.consumer.cache.maxCapacity > >> > >> to 1 > >> > >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote: > >> > I had a look at the executor logs and noticed that this exception > >> > happens > >> > only when using the cached consumer. > >> > Every retry is successful. This is consistent. > >> > One possibility is that the cached consumer is causing the failure as > >> > retry > >> > clears it. > >> > Is there a way to disable cache and test this? > >> > Again, kafkacat is running fine on the same node. > >> > > >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID > >> > 7849) > >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID > >> > 7851 > >> > > >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 > >> > offsets 57079162 -> 57090330 > >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 > >> > offsets 57098866 -> 57109957 > >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID > &
Re: Spark 2.0 with Kafka 0.10 exception
Setting those two results in below exception. No.of executors < no.of partitions. Could that be triggering this? 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 9) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source) at java.util.HashMap.putVal(Unknown Source) at java.util.HashMap.put(Unknown Source) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210) at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger <c...@koeninger.org> wrote: > you could try setting > > spark.streaming.kafka.consumer.cache.initialCapacity > > spark.streaming.kafka.consumer.cache.maxCapacity > > to 1 > > On Wed, Sep 7, 2016 at 2:02 PM, Srikanth <srikanth...@gmail.com> wrote: > > I had a look at the executor logs and noticed that this exception happens > > only when using the cached consumer. > > Every retry is successful. This is consistent. > > One possibility is that the cached consumer is causing the failure as > retry > > clears it. > > Is there a way to disable cache and test this? > > Again, kafkacat is running fine on the same node. > > > > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID > 7849) > > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID > 7851 > > > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 > > offsets 57079162 -> 57090330 > > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 > > offsets 57098866 -> 57109957 > > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID > > 7851). 1030 bytes result sent to driver > > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 > (TID > > 7849) > > java.lang.AssertionError: assertion failed: Failed to get records for > > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling > for > > 2048 > > at scala.Predef$.assert(Predef.scala:170) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > > > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task > 7854 > > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID > 7854) > > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 > > offsets 57098866 -> 57109957 > > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for > > spark-executor-StreamingPixelCount1 mt_event 0 57098866 > > > > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID > > 7854). 1103 bytes result sent to driver > > > > > > > > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote: > >> > >> Thanks Cody. Setting poll timeout helped. > >> Our
Re: Spark 2.0 with Kafka 0.10 exception
I had a look at the executor logs and noticed that this exception happens only when using the cached consumer. Every retry is successful. This is consistent. One possibility is that the cached consumer is causing the failure as retry clears it. Is there a way to disable cache and test this? Again, kafkacat is running fine on the same node. 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849) 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2 offsets 57079162 -> 57090330 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0 offsets 57098866 -> 57109957 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID 7851). 1030 bytes result sent to driver 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 (TID 7849) java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling for 2048 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854) 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0 offsets 57098866 -> 57109957 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for spark-executor-StreamingPixelCount1 mt_event 0 57098866 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID 7854). 1103 bytes result sent to driver On Wed, Aug 24, 2016 at 2:13 PM, Srikanth <srikanth...@gmail.com> wrote: > Thanks Cody. Setting poll timeout helped. > Our network is fine but brokers are not fully provisioned in test cluster. > But there isn't enough load to max out on broker capacity. > Curious that kafkacat running on the same node doesn't have any issues. > > Srikanth > > On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> You can set that poll timeout higher with >> >> spark.streaming.kafka.consumer.poll.ms >> >> but half a second is fairly generous. I'd try to take a look at >> what's going on with your network or kafka broker during that time. >> >> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com> wrote: >> > Hello, >> > >> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. >> > >> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 >> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 >> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for >> >> spark-executor-example mt_event 0 15782114 >> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator >> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group >> >> spark-executor-example. >> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 >> (TID >> >> 6) >> >> java.lang.AssertionError: assertion failed: Failed to get records for >> >> spark-executor-example mt_event 0 15782114 after polling for 512 >> >> at scala.Predef$.assert(Predef.scala:170) >> >> at >> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get( >> CachedKafkaConsumer.scala:74) >> >> at >> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:227) >> >> at >> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:193) >> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> > >> > >> > I get this error intermittently. Sometimes a few batches are scheduled >> and >> > run fine. Then I get this error. >> > kafkacat is able to fetch from this topic continuously. >> > >> > Full exception is here -- >> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 >> > >> > Srikanth >> > >
Re: Reset auto.offset.reset in Kafka 0.10 integ
Yes that's right. I understand this is a data loss. The restart doesn't have to be all that silent. It requires us to set a flag. I thought auto.offset.reset is that flag. But there isn't much I can do at this point given that retention has cleaned things up. The app has to start. Let admins address the data loss on the side. On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger <c...@koeninger.org> wrote: > Just so I'm clear on what's happening... > > - you're running a job that auto-commits offsets to kafka. > - you stop that job for longer than your retention > - you start that job back up, and it errors because the last committed > offset is no longer available > - you think that instead of erroring, the job should silently restart > based on the value of auto.offset.reset > > Is that accurate? > > > On Wed, Sep 7, 2016 at 10:44 AM, Srikanth <srikanth...@gmail.com> wrote: > > My retention is 1d which isn't terribly low. The problem is every time I > > restart after retention expiry, I get this exception instead of honoring > > auto.offset.reset. > > It isn't a corner case where retention expired after driver created a > batch. > > Its easily reproducible and consistent. > > > > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> You don't want auto.offset.reset on executors, you want executors to > >> do what the driver told them to do. Otherwise you're going to get > >> really horrible data inconsistency issues if the executors silently > >> reset. > >> > >> If your retention is so low that retention gets expired in between > >> when the driver created a batch with a given starting offset, and when > >> an executor starts to process that batch, you're going to have > >> problems. > >> > >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote: > >> > This isn't a production setup. We kept retention low intentionally. > >> > My original question was why I got the exception instead of it using > >> > auto.offset.reset on restart? > >> > > >> > > >> > > >> > > >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> > >> > wrote: > >> >> > >> >> If you leave enable.auto.commit set to true, it will commit offsets > to > >> >> kafka, but you will get undefined delivery semantics. > >> >> > >> >> If you just want to restart from a fresh state, the easiest thing to > >> >> do is use a new consumer group name. > >> >> > >> >> But if that keeps happening, you should look into why your retention > >> >> is not sufficient. > >> >> > >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> > wrote: > >> >> > You are right. I got confused as its all part of same log when > >> >> > running > >> >> > from > >> >> > IDE. > >> >> > I was looking for a good guide to read to understand the this > integ. > >> >> > > >> >> > I'm not managing offset on my own. I've not enabled checkpoint for > my > >> >> > tests. > >> >> > I assumed offsets will be stored in kafka by default. > >> >> > > >> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( > >> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte], > >> >> > Array[Byte]](pattern, kafkaParams) ) > >> >> > > >> >> >* @param offsets: offsets to begin at on initial startup. If no > >> >> > offset > >> >> > is given for a > >> >> >* TopicPartition, the committed offset (if applicable) or kafka > >> >> > param > >> >> >* auto.offset.reset will be used. > >> >> > > >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values: > >> >> > enable.auto.commit = true > >> >> > auto.offset.reset = latest > >> >> > > >> >> > Srikanth > >> >> > > >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org > > > >> >> > wrote: > >> >> >> > >> >> >> Seems like you're confused about the purpose of that line of code, > >> >> >> it > >> >> >
Re: Reset auto.offset.reset in Kafka 0.10 integ
My retention is 1d which isn't terribly low. The problem is every time I restart after retention expiry, I get this exception instead of honoring auto.offset.reset. It isn't a corner case where retention expired after driver created a batch. Its easily reproducible and consistent. On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger <c...@koeninger.org> wrote: > You don't want auto.offset.reset on executors, you want executors to > do what the driver told them to do. Otherwise you're going to get > really horrible data inconsistency issues if the executors silently > reset. > > If your retention is so low that retention gets expired in between > when the driver created a batch with a given starting offset, and when > an executor starts to process that batch, you're going to have > problems. > > On Tue, Sep 6, 2016 at 2:30 PM, Srikanth <srikanth...@gmail.com> wrote: > > This isn't a production setup. We kept retention low intentionally. > > My original question was why I got the exception instead of it using > > auto.offset.reset on restart? > > > > > > > > > > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> If you leave enable.auto.commit set to true, it will commit offsets to > >> kafka, but you will get undefined delivery semantics. > >> > >> If you just want to restart from a fresh state, the easiest thing to > >> do is use a new consumer group name. > >> > >> But if that keeps happening, you should look into why your retention > >> is not sufficient. > >> > >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote: > >> > You are right. I got confused as its all part of same log when running > >> > from > >> > IDE. > >> > I was looking for a good guide to read to understand the this integ. > >> > > >> > I'm not managing offset on my own. I've not enabled checkpoint for my > >> > tests. > >> > I assumed offsets will be stored in kafka by default. > >> > > >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( > >> > ssc, PreferConsistent, SubscribePattern[Array[Byte], > >> > Array[Byte]](pattern, kafkaParams) ) > >> > > >> >* @param offsets: offsets to begin at on initial startup. If no > >> > offset > >> > is given for a > >> >* TopicPartition, the committed offset (if applicable) or kafka > param > >> >* auto.offset.reset will be used. > >> > > >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values: > >> > enable.auto.commit = true > >> > auto.offset.reset = latest > >> > > >> > Srikanth > >> > > >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org> > >> > wrote: > >> >> > >> >> Seems like you're confused about the purpose of that line of code, it > >> >> applies to executors, not the driver. The driver is responsible for > >> >> determining offsets. > >> >> > >> >> Where are you storing offsets, in Kafka, checkpoints, or your own > >> >> store? > >> >> Auto offset reset won't be used if there are stored offsets. > >> >> > >> >> > >> >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote: > >> >>> > >> >>> Hi, > >> >>> > >> >>> Upon restarting my Spark Streaming app it is failing with error > >> >>> > >> >>> Exception in thread "main" org.apache.spark.SparkException: Job > >> >>> aborted > >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most > recent > >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost): > >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > Offsets > >> >>> out of > >> >>> range with no configured reset policy for partitions: > >> >>> {mt-event-2=1710706} > >> >>> > >> >>> It is correct that the last read offset was deleted by kafka due to > >> >>> retention period expiry. > >> >>> I've set auto.offset.reset in my app but it is getting reset here > >> >>> > >> >>> > >> >>> https://github.com/apache/spark/blob/master/external/ > kafka-0-10/src/main/scala/org/apache/spark/streaming/ > kafka010/KafkaUtils.scala#L160 > >> >>> > >> >>> How to force it to restart in this case (fully aware of potential > data > >> >>> loss)? > >> >>> > >> >>> Srikanth > >> > > >> > > > > > >
Re: Reset auto.offset.reset in Kafka 0.10 integ
This isn't a production setup. We kept retention low intentionally. My original question was why I got the exception instead of it using auto.offset.reset on restart? On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger <c...@koeninger.org> wrote: > If you leave enable.auto.commit set to true, it will commit offsets to > kafka, but you will get undefined delivery semantics. > > If you just want to restart from a fresh state, the easiest thing to > do is use a new consumer group name. > > But if that keeps happening, you should look into why your retention > is not sufficient. > > On Tue, Sep 6, 2016 at 9:39 AM, Srikanth <srikanth...@gmail.com> wrote: > > You are right. I got confused as its all part of same log when running > from > > IDE. > > I was looking for a good guide to read to understand the this integ. > > > > I'm not managing offset on my own. I've not enabled checkpoint for my > tests. > > I assumed offsets will be stored in kafka by default. > > > > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( > > ssc, PreferConsistent, SubscribePattern[Array[Byte], > > Array[Byte]](pattern, kafkaParams) ) > > > >* @param offsets: offsets to begin at on initial startup. If no > offset > > is given for a > >* TopicPartition, the committed offset (if applicable) or kafka param > >* auto.offset.reset will be used. > > > > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values: > > enable.auto.commit = true > > auto.offset.reset = latest > > > > Srikanth > > > > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> Seems like you're confused about the purpose of that line of code, it > >> applies to executors, not the driver. The driver is responsible for > >> determining offsets. > >> > >> Where are you storing offsets, in Kafka, checkpoints, or your own store? > >> Auto offset reset won't be used if there are stored offsets. > >> > >> > >> On Sep 2, 2016 14:58, "Srikanth" <srikanth...@gmail.com> wrote: > >>> > >>> Hi, > >>> > >>> Upon restarting my Spark Streaming app it is failing with error > >>> > >>> Exception in thread "main" org.apache.spark.SparkException: Job > aborted > >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent > >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost): > >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets > out of > >>> range with no configured reset policy for partitions: > {mt-event-2=1710706} > >>> > >>> It is correct that the last read offset was deleted by kafka due to > >>> retention period expiry. > >>> I've set auto.offset.reset in my app but it is getting reset here > >>> > >>> https://github.com/apache/spark/blob/master/external/ > kafka-0-10/src/main/scala/org/apache/spark/streaming/ > kafka010/KafkaUtils.scala#L160 > >>> > >>> How to force it to restart in this case (fully aware of potential data > >>> loss)? > >>> > >>> Srikanth > > > > >
Reset auto.offset.reset in Kafka 0.10 integ
Hi, Upon restarting my Spark Streaming app it is failing with error Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 6, localhost): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {mt-event-2=1710706} It is correct that the last read offset was deleted by kafka due to retention period expiry. I've set auto.offset.reset in my app but it is getting reset here https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160 How to force it to restart in this case (fully aware of potential data loss)? Srikanth
Re: Spark 2.0 with Kafka 0.10 exception
Thanks Cody. Setting poll timeout helped. Our network is fine but brokers are not fully provisioned in test cluster. But there isn't enough load to max out on broker capacity. Curious that kafkacat running on the same node doesn't have any issues. Srikanth On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger <c...@koeninger.org> wrote: > You can set that poll timeout higher with > > spark.streaming.kafka.consumer.poll.ms > > but half a second is fairly generous. I'd try to take a look at > what's going on with your network or kafka broker during that time. > > On Tue, Aug 23, 2016 at 4:44 PM, Srikanth <srikanth...@gmail.com> wrote: > > Hello, > > > > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. > > > >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 > >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 > >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for > >> spark-executor-example mt_event 0 15782114 > >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator > >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group > >> spark-executor-example. > >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 > (TID > >> 6) > >> java.lang.AssertionError: assertion failed: Failed to get records for > >> spark-executor-example mt_event 0 15782114 after polling for 512 > >> at scala.Predef$.assert(Predef.scala:170) > >> at > >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:74) > >> at > >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > >> at > >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > > > > > I get this error intermittently. Sometimes a few batches are scheduled > and > > run fine. Then I get this error. > > kafkacat is able to fetch from this topic continuously. > > > > Full exception is here -- > > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 > > > > Srikanth >
Spark 2.0 with Kafka 0.10 exception
Hello, I'm getting the below exception when testing Spark 2.0 with Kafka 0.10. 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0 > 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13 > 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-example mt_event 0 15782114 > 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator > 10.150.254.161:9233 (id: 2147483646 rack: null) for group > spark-executor-example. > 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID > 6) > java.lang.AssertionError: assertion failed: Failed to get records for > spark-executor-example mt_event 0 15782114 after polling for 512 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > I get this error intermittently. Sometimes a few batches are scheduled and run fine. Then I get this error. kafkacat is able to fetch from this topic continuously. Full exception is here -- https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767 Srikanth
Re: Rebalancing when adding kafka partitions
Yes, SubscribePattern detects new partition. Also, it has a comment saying Subscribe to all topics matching specified pattern to get dynamically > assigned partitions. > * The pattern matching will be done periodically against topics existing > at the time of check. > * @param pattern pattern to subscribe to > * @param kafkaParams Kafka Who does the new partition discover? Underlying kafka consumer or spark-streaming-kafka-0-10-assembly?? Srikanth On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger <c...@koeninger.org> wrote: > Hrrm, that's interesting. Did you try with subscribe pattern, out of > curiosity? > > I haven't tested repartitioning on the underlying new Kafka consumer, so > its possible I misunderstood something. > On Aug 12, 2016 2:47 PM, "Srikanth" <srikanth...@gmail.com> wrote: > >> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly. >> Partition was increased using "bin/kafka-topics.sh --alter" after spark >> job was started. >> I don't see messages from new partitions in the DStream. >> >> KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( >>> ssc, PreferConsistent, Subscribe[Array[Byte], >>> Array[Byte]](topics, kafkaParams) ) >>> .map(r => (r.key(), r.value())) >> >> >> Also, no.of partitions did not increase too. >> >>> dataStream.foreachRDD( (rdd, curTime) => { >>> logger.info(s"rdd has ${rdd.getNumPartitions} partitions.") >> >> >> Should I be setting some parameter/config? Is the doc for new integ >> available? >> >> Thanks, >> Srikanth >> >> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> No, restarting from a checkpoint won't do it, you need to re-define the >>> stream. >>> >>> Here's the jira for the 0.10 integration >>> >>> https://issues.apache.org/jira/browse/SPARK-12177 >>> >>> I haven't gotten docs completed yet, but there are examples at >>> >>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 >>> >>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <srikanth...@gmail.com> wrote: >>> > In Spark 1.x, if we restart from a checkpoint, will it read from new >>> > partitions? >>> > >>> > If you can, pls point us to some doc/link that talks about Kafka 0.10 >>> integ >>> > in Spark 2.0. >>> > >>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >> >>> >> For the integration for kafka 0.8, you are literally starting a >>> >> streaming job against a fixed set of topicapartitions, It will not >>> >> change throughout the job, so you'll need to restart the spark job if >>> >> you change kafka partitions. >>> >> >>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe >>> >> or subscribepattern, it should pick up new partitions as they are >>> >> added. >>> >> >>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com> >>> wrote: >>> >> > Hello, >>> >> > >>> >> > I'd like to understand how Spark Streaming(direct) would handle >>> Kafka >>> >> > partition addition? >>> >> > Will a running job be aware of new partitions and read from it? >>> >> > Since it uses Kafka APIs to query offsets and offsets are handled >>> >> > internally. >>> >> > >>> >> > Srikanth >>> > >>> > >>> >> >>
Re: Rebalancing when adding kafka partitions
I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly. Partition was increased using "bin/kafka-topics.sh --alter" after spark job was started. I don't see messages from new partitions in the DStream. KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( > ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics, > kafkaParams) ) > .map(r => (r.key(), r.value())) Also, no.of partitions did not increase too. > dataStream.foreachRDD( (rdd, curTime) => { > logger.info(s"rdd has ${rdd.getNumPartitions} partitions.") Should I be setting some parameter/config? Is the doc for new integ available? Thanks, Srikanth On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <c...@koeninger.org> wrote: > No, restarting from a checkpoint won't do it, you need to re-define the > stream. > > Here's the jira for the 0.10 integration > > https://issues.apache.org/jira/browse/SPARK-12177 > > I haven't gotten docs completed yet, but there are examples at > > https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 > > On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <srikanth...@gmail.com> wrote: > > In Spark 1.x, if we restart from a checkpoint, will it read from new > > partitions? > > > > If you can, pls point us to some doc/link that talks about Kafka 0.10 > integ > > in Spark 2.0. > > > > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> For the integration for kafka 0.8, you are literally starting a > >> streaming job against a fixed set of topicapartitions, It will not > >> change throughout the job, so you'll need to restart the spark job if > >> you change kafka partitions. > >> > >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe > >> or subscribepattern, it should pick up new partitions as they are > >> added. > >> > >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com> > wrote: > >> > Hello, > >> > > >> > I'd like to understand how Spark Streaming(direct) would handle Kafka > >> > partition addition? > >> > Will a running job be aware of new partitions and read from it? > >> > Since it uses Kafka APIs to query offsets and offsets are handled > >> > internally. > >> > > >> > Srikanth > > > > >
Re: Rebalancing when adding kafka partitions
Yeah, that's what I thought. We need to redefine not just restart. Thanks for the info! I do see the usage of subscribe[K,V] in your DStreams example. Looks simple but its not very obvious how it works :-) I'll watch out for the docs and ScalaDoc. Srikanth On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger <c...@koeninger.org> wrote: > No, restarting from a checkpoint won't do it, you need to re-define the > stream. > > Here's the jira for the 0.10 integration > > https://issues.apache.org/jira/browse/SPARK-12177 > > I haven't gotten docs completed yet, but there are examples at > > https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 > > On Fri, Jul 22, 2016 at 1:05 PM, Srikanth <srikanth...@gmail.com> wrote: > > In Spark 1.x, if we restart from a checkpoint, will it read from new > > partitions? > > > > If you can, pls point us to some doc/link that talks about Kafka 0.10 > integ > > in Spark 2.0. > > > > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> For the integration for kafka 0.8, you are literally starting a > >> streaming job against a fixed set of topicapartitions, It will not > >> change throughout the job, so you'll need to restart the spark job if > >> you change kafka partitions. > >> > >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe > >> or subscribepattern, it should pick up new partitions as they are > >> added. > >> > >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com> > wrote: > >> > Hello, > >> > > >> > I'd like to understand how Spark Streaming(direct) would handle Kafka > >> > partition addition? > >> > Will a running job be aware of new partitions and read from it? > >> > Since it uses Kafka APIs to query offsets and offsets are handled > >> > internally. > >> > > >> > Srikanth > > > > >
Re: Rebalancing when adding kafka partitions
In Spark 1.x, if we restart from a checkpoint, will it read from new partitions? If you can, pls point us to some doc/link that talks about Kafka 0.10 integ in Spark 2.0. On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger <c...@koeninger.org> wrote: > For the integration for kafka 0.8, you are literally starting a > streaming job against a fixed set of topicapartitions, It will not > change throughout the job, so you'll need to restart the spark job if > you change kafka partitions. > > For the integration for kafka 0.10 / spark 2.0, if you use subscribe > or subscribepattern, it should pick up new partitions as they are > added. > > On Fri, Jul 22, 2016 at 11:29 AM, Srikanth <srikanth...@gmail.com> wrote: > > Hello, > > > > I'd like to understand how Spark Streaming(direct) would handle Kafka > > partition addition? > > Will a running job be aware of new partitions and read from it? > > Since it uses Kafka APIs to query offsets and offsets are handled > > internally. > > > > Srikanth >
Rebalancing when adding kafka partitions
Hello, I'd like to understand how Spark Streaming(direct) would handle Kafka partition addition? Will a running job be aware of new partitions and read from it? Since it uses Kafka APIs to query offsets and offsets are handled internally. Srikanth
Re: Streaming with broadcast joins
Sabastian, *Update:-* This is not possible. Probably will remain this way for the foreseeable future. https://issues.apache.org/jira/browse/SPARK-3863 Srikanth On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <sebastian@gmail.com> wrote: > I don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I ended broadcasting a Map > > > On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote: > >> It didn't fail. It wasn't broadcasting. I just ran the test again and >> here are the logs. >> Every batch is reading the metadata file. >> >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> >> If I remember, foreachRDD is executed in the driver's context. Not sure >> how we'll be able to achieve broadcast in this approach(unless we use SQL >> broadcast hint again) >> >> When you say "it worked before", was it with an older version of spark? >> I'm trying this on 1.6. >> If you still have the streaming job running can you verify in spark UI >> that broadcast join is being used. Also, if the files are read and >> broadcasted each batch?? >> >> Thanks for the help! >> >> >> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com> >> wrote: >> >>> I don't see anything obviously wrong on your second approach, I've done >>> it like that before and it worked. When you say that it didn't work what do >>> you mean? did it fail? it didnt broadcast? >>> >>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote: >>> >>>> Code with SQL broadcast hint. This worked and I was able to see that >>>> broadcastjoin was performed. >>>> >>>> val testDF = sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> >>>> val lines = ssc.socketTextStream("DevNode", ) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>>val resultDF = recordDF.join(testDF, "Age") >>>> >>>> >>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>>> } >>>> >>>> But for every batch this file was read and broadcast was performed. >>>> Evaluating the entire DAG I guess. >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> >>>> Then I changed code to broadcast the dataframe. This didn't work >>>> either. Not sure if this is what you meant by broadcasting a dataframe. >>>> >>>> val testDF = >>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> ) >>>> >>>> val lines = ssc.socketTextStream("DevNode", ) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>> val resultDF = recordDF.join(testDF.value, "Age") >>>> >>>> >>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>>> } >>>> >>>> >>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu < >>>> sebastian@gmail.com> w
Re: Streaming with broadcast joins
Sure. These may be unrelated. On Fri, Feb 19, 2016 at 10:39 AM, Jerry Lam <chiling...@gmail.com> wrote: > Hi guys, > > I also encounter broadcast dataframe issue not for steaming jobs but > regular dataframe join. In my case, the executors died probably due to OOM > which I don't think it should use that much memory. Anyway, I'm going to > craft an example and send it here to see if it is a bug or something I've > misunderstood. > > Best Regards, > > Jerry > > Sent from my iPhone > > On 19 Feb, 2016, at 10:20 am, Sebastian Piu <sebastian@gmail.com> > wrote: > > I don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I ended broadcasting a Map > > > On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote: > >> It didn't fail. It wasn't broadcasting. I just ran the test again and >> here are the logs. >> Every batch is reading the metadata file. >> >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> >> If I remember, foreachRDD is executed in the driver's context. Not sure >> how we'll be able to achieve broadcast in this approach(unless we use SQL >> broadcast hint again) >> >> When you say "it worked before", was it with an older version of spark? >> I'm trying this on 1.6. >> If you still have the streaming job running can you verify in spark UI >> that broadcast join is being used. Also, if the files are read and >> broadcasted each batch?? >> >> Thanks for the help! >> >> >> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com> >> wrote: >> >>> I don't see anything obviously wrong on your second approach, I've done >>> it like that before and it worked. When you say that it didn't work what do >>> you mean? did it fail? it didnt broadcast? >>> >>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote: >>> >>>> Code with SQL broadcast hint. This worked and I was able to see that >>>> broadcastjoin was performed. >>>> >>>> val testDF = sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> >>>> val lines = ssc.socketTextStream("DevNode", ) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>>val resultDF = recordDF.join(testDF, "Age") >>>> >>>> >>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>>> } >>>> >>>> But for every batch this file was read and broadcast was performed. >>>> Evaluating the entire DAG I guess. >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> >>>> Then I changed code to broadcast the dataframe. This didn't work >>>> either. Not sure if this is what you meant by broadcasting a dataframe. >>>> >>>> val testDF = >>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> ) >>>> >>>> val lines = ssc.socketTextStream("DevNode", ) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>> val resultDF = recordDF.join(testDF
Re: Streaming with broadcast joins
Hmmm..OK. Srikanth On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <sebastian@gmail.com> wrote: > I don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I ended broadcasting a Map > > > On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote: > >> It didn't fail. It wasn't broadcasting. I just ran the test again and >> here are the logs. >> Every batch is reading the metadata file. >> >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> >> If I remember, foreachRDD is executed in the driver's context. Not sure >> how we'll be able to achieve broadcast in this approach(unless we use SQL >> broadcast hint again) >> >> When you say "it worked before", was it with an older version of spark? >> I'm trying this on 1.6. >> If you still have the streaming job running can you verify in spark UI >> that broadcast join is being used. Also, if the files are read and >> broadcasted each batch?? >> >> Thanks for the help! >> >> >> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com> >> wrote: >> >>> I don't see anything obviously wrong on your second approach, I've done >>> it like that before and it worked. When you say that it didn't work what do >>> you mean? did it fail? it didnt broadcast? >>> >>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote: >>> >>>> Code with SQL broadcast hint. This worked and I was able to see that >>>> broadcastjoin was performed. >>>> >>>> val testDF = sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> >>>> val lines = ssc.socketTextStream("DevNode", ) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>>val resultDF = recordDF.join(testDF, "Age") >>>> >>>> >>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>>> } >>>> >>>> But for every batch this file was read and broadcast was performed. >>>> Evaluating the entire DAG I guess. >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> >>>> Then I changed code to broadcast the dataframe. This didn't work >>>> either. Not sure if this is what you meant by broadcasting a dataframe. >>>> >>>> val testDF = >>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> ) >>>> >>>> val lines = ssc.socketTextStream("DevNode", ) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>> val resultDF = recordDF.join(testDF.value, "Age") >>>> >>>> >>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>>> } >>>> >>>> >>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu < >>>> sebastian@gmail.com> wrote: >>>> >>>>> Can you paste the code where you use sc.broadcast ? >>>>> >>>>> O
Re: listening to recursive folder structures in s3 using pyspark streaming (textFileStream)
Apparently you can pass comma separated folders. Try the suggestion given here --> http://stackoverflow.com/questions/29426246/spark-streaming-textfilestream-not-supporting-wildcards Let me know if this helps Srikanth On Wed, Feb 17, 2016 at 5:47 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com > wrote: > textFileStream doesn't support that. It only supports monitoring one > folder. > > On Wed, Feb 17, 2016 at 7:20 AM, in4maniac <sa...@skimlinks.com> wrote: > >> Hi all, >> >> I am new to pyspark streaming and I was following a tutorial I saw in the >> internet >> ( >> https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py >> ). >> But I replaced the data input with an s3 directory path as: >> >> lines = ssc.textFileStream("s3n://bucket/first/second/third1/") >> >> When I run the code and upload a file to s3n://bucket/first/second/third1/ >> (such as s3n://bucket/first/second/third1/test1.txt), the file gets >> processed as expected. >> >> Now I want it to listen to multiple directories and process files if they >> get uploaded to any of the directories: >> for example : [s3n://bucket/first/second/third1/, >> s3n://bucket/first/second/third2/ and s3n://bucket/first/second/third3/] >> >> I tried to use the pattern similar to sc.TextFile as : >> >> lines = ssc.textFileStream("s3n://bucket/first/second/*/") >> >> But this actually didn't work. Can someone please explain to me how I >> could >> achieve my objective? >> >> thanks in advance !!! >> >> in4maniac >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/listening-to-recursive-folder-structures-in-s3-using-pyspark-streaming-textFileStream-tp26247.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Streaming with broadcast joins
It didn't fail. It wasn't broadcasting. I just ran the test again and here are the logs. Every batch is reading the metadata file. 16/02/19 06:27:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 16/02/19 06:27:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 16/02/19 06:27:40 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 16/02/19 06:27:40 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 If I remember, foreachRDD is executed in the driver's context. Not sure how we'll be able to achieve broadcast in this approach(unless we use SQL broadcast hint again) When you say "it worked before", was it with an older version of spark? I'm trying this on 1.6. If you still have the streaming job running can you verify in spark UI that broadcast join is being used. Also, if the files are read and broadcasted each batch?? Thanks for the help! On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com> wrote: > I don't see anything obviously wrong on your second approach, I've done it > like that before and it worked. When you say that it didn't work what do > you mean? did it fail? it didnt broadcast? > > On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote: > >> Code with SQL broadcast hint. This worked and I was able to see that >> broadcastjoin was performed. >> >> val testDF = sqlContext.read.format("com.databricks.spark.csv") >>.schema(schema).load("file:///shared/data/test-data.txt") >> >> val lines = ssc.socketTextStream("DevNode", ) >> >> lines.foreachRDD((rdd, timestamp) => { >>val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >> l(1))).toDF() >>val resultDF = recordDF.join(testDF, "Age") >> >> >> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >> } >> >> But for every batch this file was read and broadcast was performed. >> Evaluating the entire DAG I guess. >> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> >> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> >> >> Then I changed code to broadcast the dataframe. This didn't work either. >> Not sure if this is what you meant by broadcasting a dataframe. >> >> val testDF = >> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") >> .schema(schema).load("file:///shared/data/test-data.txt") >> ) >> >> val lines = ssc.socketTextStream("DevNode", ) >> >> lines.foreachRDD((rdd, timestamp) => { >> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >> l(1))).toDF() >> val resultDF = recordDF.join(testDF.value, "Age") >> >> >> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >> } >> >> >> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian@gmail.com> >> wrote: >> >>> Can you paste the code where you use sc.broadcast ? >>> >>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote: >>> >>>> Sebastian, >>>> >>>> I was able to broadcast using sql broadcast hint. Question is how to >>>> prevent this broadcast for each RDD. >>>> Is there a way where it can be broadcast once and used locally for each >>>> RDD? >>>> Right now every batch the metadata file is read and the DF is >>>> broadcasted. >>>> I tried sc.broadcast and that did not provide this behavior. >>>> >>>> Srikanth >>>> >>>> >>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian@gmail.com >>>> > wrote: >>>> >>>>> You should be able to broadcast that data frame using sc.broadcast and >>>>> join against it. >>>>> >>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have a streaming use case where I plan to keep a
Re: Streaming with broadcast joins
Code with SQL broadcast hint. This worked and I was able to see that broadcastjoin was performed. val testDF = sqlContext.read.format("com.databricks.spark.csv") .schema(schema).load("file:///shared/data/test-data.txt") val lines = ssc.socketTextStream("DevNode", ) lines.foreachRDD((rdd, timestamp) => { val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF() val resultDF = recordDF.join(testDF, "Age") resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) } But for every batch this file was read and broadcast was performed. Evaluating the entire DAG I guess. 16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 Then I changed code to broadcast the dataframe. This didn't work either. Not sure if this is what you meant by broadcasting a dataframe. val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") .schema(schema).load("file:///shared/data/test-data.txt") ) val lines = ssc.socketTextStream("DevNode", ) lines.foreachRDD((rdd, timestamp) => { val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF() val resultDF = recordDF.join(testDF.value, "Age") resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) } On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian@gmail.com> wrote: > Can you paste the code where you use sc.broadcast ? > > On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> wrote: > >> Sebastian, >> >> I was able to broadcast using sql broadcast hint. Question is how to >> prevent this broadcast for each RDD. >> Is there a way where it can be broadcast once and used locally for each >> RDD? >> Right now every batch the metadata file is read and the DF is broadcasted. >> I tried sc.broadcast and that did not provide this behavior. >> >> Srikanth >> >> >> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian@gmail.com> >> wrote: >> >>> You should be able to broadcast that data frame using sc.broadcast and >>> join against it. >>> >>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> I have a streaming use case where I plan to keep a dataset broadcasted >>>> and cached on each executor. >>>> Every micro batch in streaming will create a DF out of the RDD and join >>>> the batch. >>>> The below code will perform the broadcast operation for each RDD. Is >>>> there a way to broadcast it just once? >>>> >>>> Alternate approachs are also welcome. >>>> >>>> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1) >>>> >>>> val metaDF = >>>> sqlContext.read.format("json").schema(schema1).load(file2) >>>> .join(DF1, "id") >>>> metaDF.cache >>>> >>>> >>>> val lines = streamingcontext.textFileStream(path) >>>> >>>> lines.foreachRDD( rdd => { >>>> val recordDF = rdd.flatMap(r => Record(r)).toDF() >>>> val joinedDF = recordDF.join(broadcast(metaDF), "id") >>>> >>>> joinedDF.rdd.foreachPartition ( partition => { >>>> partition.foreach( row => { >>>> ... >>>> ... >>>> }) >>>> }) >>>> }) >>>> >>>> streamingcontext.start >>>> >>>> On a similar note, if the metaDF is too big for broadcast, can I >>>> partition it(df.repartition($"col")) and also partition each streaming RDD? >>>> This way I can avoid shuffling metaDF each time. >>>> >>>> Let me know you thoughts. >>>> >>>> Thanks >>>> >>>> >>
Re: Streaming with broadcast joins
Sebastian, I was able to broadcast using sql broadcast hint. Question is how to prevent this broadcast for each RDD. Is there a way where it can be broadcast once and used locally for each RDD? Right now every batch the metadata file is read and the DF is broadcasted. I tried sc.broadcast and that did not provide this behavior. Srikanth On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian@gmail.com> wrote: > You should be able to broadcast that data frame using sc.broadcast and > join against it. > > On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: > >> Hello, >> >> I have a streaming use case where I plan to keep a dataset broadcasted >> and cached on each executor. >> Every micro batch in streaming will create a DF out of the RDD and join >> the batch. >> The below code will perform the broadcast operation for each RDD. Is >> there a way to broadcast it just once? >> >> Alternate approachs are also welcome. >> >> val DF1 = sqlContext.read.format("json").schema(schema1).load(file1) >> >> val metaDF = >> sqlContext.read.format("json").schema(schema1).load(file2) >> .join(DF1, "id") >> metaDF.cache >> >> >> val lines = streamingcontext.textFileStream(path) >> >> lines.foreachRDD( rdd => { >> val recordDF = rdd.flatMap(r => Record(r)).toDF() >> val joinedDF = recordDF.join(broadcast(metaDF), "id") >> >> joinedDF.rdd.foreachPartition ( partition => { >> partition.foreach( row => { >> ... >> ... >> }) >> }) >> }) >> >> streamingcontext.start >> >> On a similar note, if the metaDF is too big for broadcast, can I >> partition it(df.repartition($"col")) and also partition each streaming RDD? >> This way I can avoid shuffling metaDF each time. >> >> Let me know you thoughts. >> >> Thanks >> >>
Streaming with broadcast joins
Hello, I have a streaming use case where I plan to keep a dataset broadcasted and cached on each executor. Every micro batch in streaming will create a DF out of the RDD and join the batch. The below code will perform the broadcast operation for each RDD. Is there a way to broadcast it just once? Alternate approachs are also welcome. val DF1 = sqlContext.read.format("json").schema(schema1).load(file1) val metaDF = sqlContext.read.format("json").schema(schema1).load(file2) .join(DF1, "id") metaDF.cache val lines = streamingcontext.textFileStream(path) lines.foreachRDD( rdd => { val recordDF = rdd.flatMap(r => Record(r)).toDF() val joinedDF = recordDF.join(broadcast(metaDF), "id") joinedDF.rdd.foreachPartition ( partition => { partition.foreach( row => { ... ... }) }) }) streamingcontext.start On a similar note, if the metaDF is too big for broadcast, can I partition it(df.repartition($"col")) and also partition each streaming RDD? This way I can avoid shuffling metaDF each time. Let me know you thoughts. Thanks
spark-csv partitionBy
Hello, I want to save Spark job result as LZO compressed CSV files partitioned by one or more columns. Given that partitionBy is not supported by spark-csv, is there any recommendation for achieving this in user code? One quick option is to i) cache the result dataframe ii) get unique partition keys iii) Iterate over keys and filter the result for that key rawDF.cache val idList = rawDF.select($"ID").distinct.collect.toList.map(_.getLong(0)) idList.foreach( id => { val rows = rawDF.filter($"ID" === id) rows.write.format("com.databricks.spark.csv").save(s"hdfs:///output/id=$id/") }) This approach doesn't scale well. Especially since no.of unique IDs can be between 500-700. And adding a second partition column will make this even worst. Wondering if anyone has an efficient work around? Srikanth
Re: Broadcast join on multiple dataframes
Hello, Any pointers on what is causing the optimizer to convert broadcast to shuffle join? This join is with a file that is just 4kb in size. Complete plan --> https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0 DAG from UI --> https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_select.PNG?dl=0 == Optimized Logical Plan == Project [...] +- Join LeftOuter, Some((start_ip#48L = start_ip_num#144L)) :- Project [...] : +- Join Inner, Some((cast(creative_id#9 as bigint) = creative_id#130L)) : :- Project [...] : : +- Join Inner, Some((cast(strategy_id#10 as bigint) = strategy_id#126L)) : : :- Project [...] : : : +- Join LeftOuter, Some((cast(exchange_id#13 as bigint) = id#142L)) : : : :- Project [...] : : : : +- Join LeftOuter, Some((browser_id#59 = technology_key#169)) : : : : :- Project [...] : : : : : +- Join LeftOuter, Some((primary_browser_language#61 = id#166)) : : : : : :- Project [...] : : : : : : +- Filter ((NOT (campaign_id#12 = 0) && (mm_int_cost#36 < 100.0)) && ((cost_sum#41 < 100.0) && (total_spend#42 < 100.0))) : : : : : : +- Relation[...) : : : : : +- Project [id#166,two_letter_code#167] : : : : :+- BroadcastHint : : : : : +- Relation[... : : : : +- BroadcastHint : : : :+- Relation[... : : : +- Project [description#141,id#142L] : : :+- BroadcastHint : : : +- Relation[description#141,id#142L,name#143] JSONRelation == Physical Plan == Project [...] +- SortMergeOuterJoin [start_ip#48L], [start_ip_num#144L], LeftOuter, None :- Sort [start_ip#48L ASC], false, 0 : +- TungstenExchange hashpartitioning(start_ip#48L,480), None : +- Project [...] :+- BroadcastHashJoin [cast(creative_id#9 as bigint)], [creative_id#130L], BuildRight : :- Project [...] : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], [strategy_id#126L], BuildRight : : :- Project [...] : : : +- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#142L], LeftOuter, None : : : :- Project [...] : : : : +- BroadcastHashOuterJoin [browser_id#59], [technology_key#169], LeftOuter, None : : : : :- Project [...] : : : : : +- SortMergeOuterJoin [primary_browser_language#61], [id#166], LeftOuter, None : : : : : :- Sort [primary_browser_language#61 ASC], false, 0 : : : : : : +- TungstenExchange hashpartitioning(primary_browser_language#61,480), None : : : : : : +- Project [...] : : : : : :+- Filter (((NOT (campaign_id#12 = 0) && (mm_int_cost#36 < 100.0)) && (cost_sum#41 < 100.0)) && (total_spend#42 < 100.0)) : : : : : : +- Scan CsvRelation(,Some(s3:// : : : : : +- Sort [id#166 ASC], false, 0 : : : : :+- TungstenExchange hashpartitioning(id#166,480), None : : : : : +- Project [id#166,two_letter_code#167] : : : : : +- Scan CsvRelation(,Some(s3 : : : : +- ConvertToUnsafe : : : :+- Scan CsvRelation(,Some(s3:// : : : +- Project [description#141,id#142L] : : :+- Scan JSONRelation[description#141,id#142L,name#143] InputPaths: s3:// : : +- Project
Re: Broadcast join on multiple dataframes
Micheal, Output of DF.queryExecution is saved to https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0 I don't see anything in this to suggest a switch in strategy. Hopefully you find this helpful. Srikanth On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust <mich...@databricks.com> wrote: > Can you provide the analyzed and optimized plans (explain(true)) > > On Thu, Jan 28, 2016 at 12:26 PM, Srikanth <srikanth...@gmail.com> wrote: > >> Hello, >> >> I have a use case where one large table has to be joined with several >> smaller tables. >> I've added broadcast hint for all small tables in the joins. >> >> val largeTableDF = sqlContext.read.format("com.databricks.spark.csv") >> >> val metaActionDF = sqlContext.read.format("json") >> val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv") >> val metaLocationDF = >> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile) >>.join(broadcast(metaActionDF), >> "campaign_id") >>.join(broadcast(cidOrgDF), >> List("organization_id"), "left_outer") >> >> val metaCreativeDF = sqlContext.read.format("json") >> val metaExchangeDF = sqlContext.read.format("json") >> val localizationDF = >> sqlContext.read.format("com.databricks.spark.csv") >> val techKeyDF = sqlContext.read.format("com.databricks.spark.csv") >> >> val joinedBidderDF = largeTableDF.as("BID") >> .join(broadcast(metaLocationDF), >> "strategy_id") >> .join(broadcast(metaCreativeDF), >> "creative_id") >> .join(broadcast(metaExchangeDF), >> $"exchange_id" === $"id" , "left_outer") >> .join(broadcast(techKeyDF).as("TK"), >> $"BID.tech_id" === $"TK.tech_key" , "left_outer") >> .join(broadcast(localizationDF).as("BL"), >> $"BID.language" === $"BL.id" , "left_outer") >> >> When I look at the execution plan, all the joins are marked as >> broadcastjoin. >> But when I look at the spark job UI, the DAG visualization shows that >> some joins are sortmerged with shuffle involved. >> The ones that I've highlighted in yellow were shuffled. >> DAG can be viewed here - >> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0 >> >> Why is the actual execution as seen in the DAG different from the >> physical plan pasted below. >> I'm trying not to shuffle my largeTable. Any idea what is causing this? >> >> == Physical Plan == >> >> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None >> >> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None >> >> : :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L], >> LeftOuter, None >> >> : : :- Project [...] >> >> : : : +- BroadcastHashJoin [cast(creative_id#9 as bigint)], >> [creative_id#131L], BuildRight >> >> : : : :- Project [...] >> >> : : : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], >> [strategy_id#127L], BuildRight >> >> : : : : :- ConvertToUnsafe >> >> : : : : : +- Scan >> CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false, >> >> : : : : +- Project [...] >> >> : : : :+- BroadcastHashOuterJoin [organization_id#90L], >> [cast(organization_id#102 as bigint)], LeftOuter, None >> >> : : : : :- Project [...] >> >> : : : : : +- BroadcastHashJoin [campaign_id#105L], >> [campaign_id#75L], BuildRight >> >> : : : : : :- Project [...] >> >> : : : : : : +- Scan >> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths: >> file:/shared/data/t1_meta/t1_meta_strategy.jsonl >> >> : : : : : +- Scan JSONRelation[] InputPaths: >> file:/shared/data/t1_meta/t1_meta_campaign.jsonl >> >> : : : : +- ConvertToUnsafe >> >> : : : : +- Scan >> CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,, >> >> : : : +- Scan >> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] >> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl >> >> : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: >> file:/shared/data/t1_meta/t1_meta_exchange.jsonl >> >> : +- ConvertToUnsafe >> >> : +- Scan >> CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false, >> >> >> +- ConvertToUnsafe >> >>+- Scan >> CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false >> >> >> >> Srikanth >> > >
Broadcast join on multiple dataframes
Hello, I have a use case where one large table has to be joined with several smaller tables. I've added broadcast hint for all small tables in the joins. val largeTableDF = sqlContext.read.format("com.databricks.spark.csv") val metaActionDF = sqlContext.read.format("json") val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv") val metaLocationDF = sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile) .join(broadcast(metaActionDF), "campaign_id") .join(broadcast(cidOrgDF), List("organization_id"), "left_outer") val metaCreativeDF = sqlContext.read.format("json") val metaExchangeDF = sqlContext.read.format("json") val localizationDF = sqlContext.read.format("com.databricks.spark.csv") val techKeyDF = sqlContext.read.format("com.databricks.spark.csv") val joinedBidderDF = largeTableDF.as("BID") .join(broadcast(metaLocationDF), "strategy_id") .join(broadcast(metaCreativeDF), "creative_id") .join(broadcast(metaExchangeDF), $"exchange_id" === $"id" , "left_outer") .join(broadcast(techKeyDF).as("TK"), $"BID.tech_id" === $"TK.tech_key" , "left_outer") .join(broadcast(localizationDF).as("BL"), $"BID.language" === $"BL.id" , "left_outer") When I look at the execution plan, all the joins are marked as broadcastjoin. But when I look at the spark job UI, the DAG visualization shows that some joins are sortmerged with shuffle involved. The ones that I've highlighted in yellow were shuffled. DAG can be viewed here - https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0 Why is the actual execution as seen in the DAG different from the physical plan pasted below. I'm trying not to shuffle my largeTable. Any idea what is causing this? == Physical Plan == BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None : :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L], LeftOuter, None : : :- Project [...] : : : +- BroadcastHashJoin [cast(creative_id#9 as bigint)], [creative_id#131L], BuildRight : : : :- Project [...] : : : : +- BroadcastHashJoin [cast(strategy_id#10 as bigint)], [strategy_id#127L], BuildRight : : : : :- ConvertToUnsafe : : : : : +- Scan CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false, : : : : +- Project [...] : : : :+- BroadcastHashOuterJoin [organization_id#90L], [cast(organization_id#102 as bigint)], LeftOuter, None : : : : :- Project [...] : : : : : +- BroadcastHashJoin [campaign_id#105L], [campaign_id#75L], BuildRight : : : : : :- Project [...] : : : : : : +- Scan JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths: file:/shared/data/t1_meta/t1_meta_strategy.jsonl : : : : : +- Scan JSONRelation[] InputPaths: file:/shared/data/t1_meta/t1_meta_campaign.jsonl : : : : +- ConvertToUnsafe : : : : +- Scan CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,, : : : +- Scan JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130] InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl : : +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths: file:/shared/data/t1_meta/t1_meta_exchange.jsonl : +- ConvertToUnsafe : +- Scan CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false, +- ConvertToUnsafe +- Scan CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false Srikanth
Adding additional jars to distributed cache (yarn-client)
Hi, Am trying to use JavaSparkContext() to create a new SparkContext and attempted to pass the requisite jars. But looks like they aren't getting added to the distributed cache automatically. Looking into YarnClientSchedulerBackend::start() and ClientArguments, it did seem like it would just add the SPARK_JAR and APP_JAR. Am wondering what is the best way to add additional files to Distributed cache and also have them appear in the classpath for ExecutorLauncher. Thanks Srikanth Sundarrajan
Dataframe collect() work but count() fails
Hello, I'm seeing a strange behavior where count() on a DataFrame errors as shown below but collect() works fine. This is what I tried from spark-shell. solrRDD.queryShards() return a javaRDD. val rdd = solrRDD.queryShards(sc, query, _version_, 2).rdd rdd: org.apache.spark.rdd.RDD[org.apache.solr.common.SolrDocument] = MapPartitionsRDD[3] at flatMap at SolrRDD.java:335 scala val schema = solrRDD.getQuerySchema(query) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ApplicationType,StringType,true), StructField(Language,StringType,true), StructField(MfgCode,StringType,true), StructField(OpSystemCode,StringType,true), StructField(ProductCode,StringType,true), StructField(ProductName,StringType,true), StructField(ProductVersion,StringType,true), StructField(_version_,LongType,true), StructField(id,StringType,true)) scala val rows = rdd.map(doc = RowFactory.create(schema.fieldNames.map(f = doc.getFirstValue(f))) ) //Convert RDD[SolrDocument] to RDD[Row] scala val df = sqlContext.createDataFrame(rows, schema) scala val data = df.collect data: Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@2135773a], [[Ljava.lang.Object;@3d2691de], [[Ljava.lang.Object;@2f32a52f], [[Ljava.lang.Object;@25fac8de] scala df.count 15/08/26 14:53:28 WARN TaskSetManager: Lost task 1.3 in stage 6.0 (TID 42, 172.19.110.1): java.lang.AssertionError: assertion failed: Row column number mismatch, expected 9 columns, but got 1. Row content: [[Ljava.lang.Object;@1d962eb2] at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) Any idea what is wrong here? Srikanth
Re: Estimate size of Dataframe programatically
SizeEstimator.estimate(df) will not give the size of dataframe rt? I think it will give size of df object. With RDD, I sample() and collect() and sum size of each row. If I do the same with dataframe it will no longer be size when represented in columnar format. I'd also like to know how spark.sql.autoBroadcastJoinThreshold estimates size of dataframe. Is it going to broadcast when columnar storage size is less that 10 MB? Srikanth On Fri, Aug 7, 2015 at 2:51 PM, Ted Yu yuzhih...@gmail.com wrote: Have you tried calling SizeEstimator.estimate() on a DataFrame ? I did the following in REPL: scala SizeEstimator.estimate(df) res1: Long = 17769680 FYI On Fri, Aug 7, 2015 at 6:48 AM, Srikanth srikanth...@gmail.com wrote: Hello, Is there a way to estimate the approximate size of a dataframe? I know we can cache and look at the size in UI but I'm trying to do this programatically. With RDD, I can sample and sum up size using SizeEstimator. Then extrapolate it to the entire RDD. That will give me approx size of RDD. With dataframes, its tricky due to columnar storage. How do we do it? On a related note, I see size of RDD object to be ~60MB. Is that the footprint of RDD in driver JVM? scala val temp = sc.parallelize(Array(1,2,3,4,5,6)) scala SizeEstimator.estimate(temp) res13: Long = 69507320 Srikanth
Estimate size of Dataframe programatically
Hello, Is there a way to estimate the approximate size of a dataframe? I know we can cache and look at the size in UI but I'm trying to do this programatically. With RDD, I can sample and sum up size using SizeEstimator. Then extrapolate it to the entire RDD. That will give me approx size of RDD. With dataframes, its tricky due to columnar storage. How do we do it? On a related note, I see size of RDD object to be ~60MB. Is that the footprint of RDD in driver JVM? scala val temp = sc.parallelize(Array(1,2,3,4,5,6)) scala SizeEstimator.estimate(temp) res13: Long = 69507320 Srikanth
How does DataFrame except work?
Hello, I'm planning to use DF1.except(DF2) to get difference between two dataframes. I'd like to know how exactly this API works. Both explain() and spark UI show except as an operation on its own. Internally, does does it do a hash partition of both dataframes? If so will it do auto broadcast if second dataframe is small enough? Srikanth
spark-csv number of partitions
Hello, I'm using spark-csv instead of sc.textfile() to work with CSV files. How can I set no.of partitions that will be created when reading a CSV? Basically an equivalent for minPartitions in textFile() val myrdd = sc.textFile(my.csv,24) Srikanth
spark.deploy.spreadOut core allocation
Hello, I've set spark.deploy.spreadOut=false in spark-env.sh. export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 -Dspark.deploy.spreadOut=false There are 3 workers each with 4 cores. Spark-shell was started with noof cores = 6. Spark UI show that one executor was used with 6 cores. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth
Re: spark.deploy.spreadOut core allocation
Cool. Thanks! Srikanth On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or and...@databricks.com wrote: Hi Srikanth, I was able to reproduce the issue by setting `spark.cores.max` to a number greater than the number of cores on a worker. I've filed SPARK-9260 which I believe is already being fixed in https://github.com/apache/spark/pull/7274. Thanks for reporting the issue! -Andrew 2015-07-22 11:49 GMT-07:00 Andrew Or and...@databricks.com: Hi Srikanth, It does look like a bug. Did you set `spark.executor.cores` in your application by any chance? -Andrew 2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com: Hello, I've set spark.deploy.spreadOut=false in spark-env.sh. export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 -Dspark.deploy.spreadOut=false There are 3 workers each with 4 cores. Spark-shell was started with noof cores = 6. Spark UI show that one executor was used with 6 cores. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth
ShuffledHashJoin instead of CartesianProduct
Hello, I'm trying to link records from two large data sources. Both datasets have almost same number of rows. Goal is to match records based on multiple columns. val matchId = SFAccountDF.as(SF).join(ELAccountDF.as(EL)).where($SF.Email === $EL.EmailAddress || $SF.Phone === EL.Phone) Joining with a OR(||) will result in a CartesianProduct. I'm trying to avoid that. One way to do this is to join on each column and UNION the results. val phoneMatch = SFAccountDF.as(SF).filter(Phone != '').join(ELAccountDF.as(EL).filter(BusinessPhone != '')).where($SF.Phone === $EL.BusinessPhone) val emailMatch = SFAccountDF.as(SF).filter(Email != '').join(ELAccountDF.as(EL).filter(EmailAddress != '')).where($SF.Email === $EL.EmailAddress) val matchId = phoneMatch.unionAll(emailMatch.unionAll(faxMatch.unionAll(mobileMatch))) matchId.cache().registerTempTable(matchId) Is there a more elegant way to do this? On a related note, has anyone worked on record linkage using Bloom Filters, Levenshtein distance, etc in Spark? Srikanth
Re: RowId unique key for Dataframes
Will work. Thanks! zipWithUniqueId() doesn't guarantee continuous ID either. Srikanth On Tue, Jul 21, 2015 at 9:48 PM, Burak Yavuz brk...@gmail.com wrote: Would monotonicallyIncreasingId https://github.com/apache/spark/blob/d4c7a7a3642a74ad40093c96c4bf45a62a470605/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L637 work for you? Best, Burak On Tue, Jul 21, 2015 at 4:55 PM, Srikanth srikanth...@gmail.com wrote: Hello, I'm creating dataframes from three CSV files using spark-csv package. I want to add a unique ID for each row in dataframe. Not sure how withColumn() can be used to achieve this. I need a Long value not an UUID. One option I found was to create a RDD and use zipWithUniqueId. sqlContext.textFile(file). zipWithUniqueId(). map(case(d, i)=i.toString + delimiter + d). map(_.split(delimiter)). map(s=caseclass(...)) .toDF().select(field1, field2) Its a bit hacky. Is there an easier way to do this on dataframes and use spark-csv? Srikanth
Re: HiveThriftServer2.startWithContext error with registerTempTable
Cheng, Yes, select * from temp_table was working. I was able to perform some transformation+action on the dataframe and print it on console. HiveThriftServer2.startWithContext was being run on the same session. When you say try --jars option, are you asking me to pass spark-csv jar? I'm already doing this with --packages com.databricks:spark-csv_2.10:1.0.3 Not sure if I'm missing your point here. Anyways, I gave it a shot. I downloaded spark-csv_2.10-0.1.jar and started spark-shell with --jars. I still get the same exception. I'm pasting the exception below. scala 15/07/16 11:29:22 ERROR SparkExecuteStatementOperation: Error executing query: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 15/07/16 11:29:22 WARN ThriftCLIService: Error executing statement: org.apache.hive.service.cli.HiveSQLException: java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1 at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Srikanth On Thu, Jul 16, 2015 at 12:44 AM, Cheng, Hao hao.ch...@intel.com wrote: Have you ever try query the “select * from temp_table” from the spark shell? Or can you try the option --jars while starting the spark shell? *From:* Srikanth [mailto:srikanth...@gmail.com] *Sent:* Thursday, July 16, 2015 9:36 AM *To:* user *Subject:* Re: HiveThriftServer2.startWithContext error with registerTempTable Hello, Re-sending this to see if I'm second time lucky! I've not managed to move past this error. Srikanth On Mon, Jul 13, 2015 at 9:14 PM, Srikanth srikanth...@gmail.com wrote: Hello, I want to expose result of Spark computation to external tools. I plan to do this with Thrift server JDBC interface by registering result Dataframe as temp table. I wrote a sample program in spark-shell to test this. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext.implicits._ HiveThriftServer2.startWithContext(hiveContext) val myDF = hiveContext.read.format(com.databricks.spark.csv).option(header, true).load(/datafolder/weblog/pages.csv) myDF.registerTempTable(temp_table) I'm able to see the temp table in Beeline +-+--+ | tableName | isTemporary | +-+--+ | temp_table | true | | my_table| false| +-+--+ Now when I issue select * from temp_table from Beeline, I see below exception in spark-shell 15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement: org.apache.hive.service.cli.HiveSQLException: *java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1* at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I'm able to read the other table(my_table) from Beeline though. Any suggestions on how to overcome this? This is with Spark 1.4 pre-built version. Spark-shell was started with --package to pass spark-csv. Srikanth
Re: HiveThriftServer2.startWithContext error with registerTempTable
Hello, Re-sending this to see if I'm second time lucky! I've not managed to move past this error. Srikanth On Mon, Jul 13, 2015 at 9:14 PM, Srikanth srikanth...@gmail.com wrote: Hello, I want to expose result of Spark computation to external tools. I plan to do this with Thrift server JDBC interface by registering result Dataframe as temp table. I wrote a sample program in spark-shell to test this. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext.implicits._ HiveThriftServer2.startWithContext(hiveContext) val myDF = hiveContext.read.format(com.databricks.spark.csv).option(header, true).load(/datafolder/weblog/pages.csv) myDF.registerTempTable(temp_table) I'm able to see the temp table in Beeline +-+--+ | tableName | isTemporary | +-+--+ | temp_table | true | | my_table| false| +-+--+ Now when I issue select * from temp_table from Beeline, I see below exception in spark-shell 15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement: org.apache.hive.service.cli.HiveSQLException: *java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1* at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I'm able to read the other table(my_table) from Beeline though. Any suggestions on how to overcome this? This is with Spark 1.4 pre-built version. Spark-shell was started with --package to pass spark-csv. Srikanth
cache() VS cacheTable()
Hello, I was reading learning spark book and saw a tip in chapter 9 that read In Spark 1.2, the regular cache() method on RDDs also results in a cacheTable() Is that true? When I cache a RDD and cache same data as a dataframe I see that memory usage for dataframe cache is way less than RDD cache. I thought this difference is due to columnar format used by dataframe. As per the statement in the book, cache size should be similar. Srikanth
HiveThriftServer2.startWithContext error with registerTempTable
Hello, I want to expose result of Spark computation to external tools. I plan to do this with Thrift server JDBC interface by registering result Dataframe as temp table. I wrote a sample program in spark-shell to test this. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext.implicits._ HiveThriftServer2.startWithContext(hiveContext) val myDF = hiveContext.read.format(com.databricks.spark.csv).option(header, true).load(/datafolder/weblog/pages.csv) myDF.registerTempTable(temp_table) I'm able to see the temp table in Beeline +-+--+ | tableName | isTemporary | +-+--+ | temp_table | true | | my_table| false| +-+--+ Now when I issue select * from temp_table from Beeline, I see below exception in spark-shell 15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement: org.apache.hive.service.cli.HiveSQLException: *java.lang.ClassNotFoundException: com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1* at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I'm able to read the other table(my_table) from Beeline though. Any suggestions on how to overcome this? This is with Spark 1.4 pre-built version. Spark-shell was started with --package to pass spark-csv. Srikanth
Re: How do we control output part files created by Spark job?
Reducing no.of partitions may have impact on memory consumption. Especially if there is uneven distribution of key used in groupBy. Depends on your dataset. On Sat, Jul 11, 2015 at 5:06 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Sriknath thanks much it worked when I set spark.sql.shuffle.partitions=10 I think reducing shuffle partitions will slower my group by query of hiveContext or it wont slow it down please guide. On Sat, Jul 11, 2015 at 7:41 AM, Srikanth srikanth...@gmail.com wrote: Is there a join involved in your sql? Have a look at spark.sql.shuffle.partitions? Srikanth On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Srikant thanks for the response. I have the following code: hiveContext.sql(insert into... ).coalesce(6) Above code does not create 6 part files it creates around 200 small files. Please guide. Thanks. On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote: Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi I tried both approach using df. repartition(6) and df.coalesce(6) it doesn't reduce part-x files. Even after calling above method I still see around 200 small part files of size 20 mb each which is again orc files. On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do we control output part files created by Spark job?
Is there a join involved in your sql? Have a look at spark.sql.shuffle.partitions? Srikanth On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Srikant thanks for the response. I have the following code: hiveContext.sql(insert into... ).coalesce(6) Above code does not create 6 part files it creates around 200 small files. Please guide. Thanks. On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote: Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi I tried both approach using df. repartition(6) and df.coalesce(6) it doesn't reduce part-x files. Even after calling above method I still see around 200 small part files of size 20 mb each which is again orc files. On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parallelizing multiple RDD / DataFrame creation in Spark
Your tableLoad() APIs are not actions. File will be read fully only when an action is performed. If the action is something like table1.join(table2), then I think both files will be read in parallel. Can you try that and look at the execution plan or in 1.4 this is shown in Spark UI. Srikanth On Wed, Jul 8, 2015 at 11:06 AM, Brandon White bwwintheho...@gmail.com wrote: The point of running them in parallel would be faster creation of the tables. Has anybody been able to efficiently parallelize something like this in Spark? On Jul 8, 2015 12:29 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Whats the point of creating them in parallel? You can multi-thread it run it in parallel though. Thanks Best Regards On Wed, Jul 8, 2015 at 5:34 AM, Brandon White bwwintheho...@gmail.com wrote: Say I have a spark job that looks like following: def loadTable1() { val table1 = sqlContext.jsonFile(ss3://textfiledirectory/) table1.cache().registerTempTable(table1)} def loadTable2() { val table2 = sqlContext.jsonFile(ss3://testfiledirectory2/) table2.cache().registerTempTable(table2)} def loadAllTables() { loadTable1() loadTable2()} loadAllTables() How do I parallelize this Spark job so that both tables are created at the same time or in parallel?
Re: How do we control output part files created by Spark job?
Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi I tried both approach using df. repartition(6) and df.coalesce(6) it doesn't reduce part-x files. Even after calling above method I still see around 200 small part files of size 20 mb each which is again orc files. On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: BroadcastHashJoin when RDD is not cached
Good to know this will be in next release. Thanks. On Wed, Jul 1, 2015 at 3:13 PM, Michael Armbrust mich...@databricks.com wrote: We don't know that the table is small unless you cache it. In Spark 1.5 you'll be able to give us a hint though ( https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581 ) On Wed, Jul 1, 2015 at 8:30 AM, Srikanth srikanth...@gmail.com wrote: Hello, I have a straight forward use case of joining a large table with a smaller table. The small table is within the limit I set for spark.sql.autoBroadcastJoinThreshold. I notice that ShuffledHashJoin is used to perform the join. BroadcastHashJoin was used only when I pre-fetched and cached the small table. I understand that for typical broadcast we would have to read and collect() the small table in driver before broadcasting. Why not do this automatically for joins? That way stage1(read large table) and stage2(read small table) can still be run in parallel. Sort [emailId#19 ASC,date#0 ASC], true Exchange (RangePartitioning 24) Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] Filter ((lowerTime#22 = date#0) (date#0 = upperTime#23)) *ShuffledHashJoin* [ip#7], [ip#18], BuildRight Exchange (HashPartitioning 24) Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] PhysicalRDD [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6], MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25 Exchange (HashPartitioning 24) Project [emailId#19,scalaUDF(date#20) AS upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22] PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at rddToDataFrameHolder at DataSourceReader.scala:41 Srikanth
BroadcastHashJoin when RDD is not cached
Hello, I have a straight forward use case of joining a large table with a smaller table. The small table is within the limit I set for spark.sql.autoBroadcastJoinThreshold. I notice that ShuffledHashJoin is used to perform the join. BroadcastHashJoin was used only when I pre-fetched and cached the small table. I understand that for typical broadcast we would have to read and collect() the small table in driver before broadcasting. Why not do this automatically for joins? That way stage1(read large table) and stage2(read small table) can still be run in parallel. Sort [emailId#19 ASC,date#0 ASC], true Exchange (RangePartitioning 24) Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] Filter ((lowerTime#22 = date#0) (date#0 = upperTime#23)) *ShuffledHashJoin* [ip#7], [ip#18], BuildRight Exchange (HashPartitioning 24) Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] PhysicalRDD [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6], MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25 Exchange (HashPartitioning 24) Project [emailId#19,scalaUDF(date#20) AS upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22] PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at rddToDataFrameHolder at DataSourceReader.scala:41 Srikanth
Re: Spark 1.4 RDD to DF fails with toDF()
My error was related to Scala version. Upon further reading, I realized that it takes some effort to get Spark working with Scala 2.11. I've reverted to using 2.10 and moved past that error. Now I hit the issue you mentioned. Waiting for 1.4.1. Srikanth On Fri, Jun 26, 2015 at 9:10 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: I got a similar issue. Might your as well be related to this https://issues.apache.org/jira/browse/SPARK-8368 ? On Fri, Jun 26, 2015 at 2:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Those provided spark libraries are compatible with scala 2.11? Thanks Best Regards On Fri, Jun 26, 2015 at 4:48 PM, Srikanth srikanth...@gmail.com wrote: Thanks Akhil for checking this out. Here is my build.sbt. name := Weblog Analysis version := 1.0 scalaVersion := 2.11.5 javacOptions ++= Seq(-source, 1.7, -target, 1.7) libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.4.0 % provided, org.apache.spark %% spark-sql % 1.4.0, org.apache.spark %% spark-streaming % 1.4.0, org.apache.spark %% spark-streaming-kafka % 1.4.0, org.apache.spark %% spark-mllib % 1.4.0, org.apache.commons % commons-lang3 % 3.0, org.eclipse.jetty % jetty-client % 8.1.14.v20131031, org.scalatest %% scalatest % 2.2.1 % test, com.databricks % spark-csv_2.11 % 1.0.3, joda-time % joda-time % 2.8.1, org.joda % joda-convert % 1.7 ) resolvers ++= Seq( Sonatype OSS Snapshots at http://oss.sonatype.org/content/repositories/snapshots/;, Sonatype public at http://oss.sonatype.org/content/groups/public/;, Sonatype at http://nexus.scala-tools.org/content/repositories/public;, Scala Tools at http://scala-tools.org/repo-snapshots/;, Typesafeat http://repo.typesafe.com/typesafe/releases/;, Akka at http://akka.io/repository/;, JBoss at http://repository.jboss.org/nexus/content/groups/public/;, GuiceyFruit at http://guiceyfruit.googlecode.com/svn/repo/releases/ ) On Fri, Jun 26, 2015 at 4:13 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Its a scala version conflict, can you paste your build.sbt file? Thanks Best Regards On Fri, Jun 26, 2015 at 7:05 AM, stati srikanth...@gmail.com wrote: Hello, When I run a spark job with spark-submit it fails with below exception for code line /*val webLogDF = webLogRec.toDF().select(ip, date, name)*/ I had similar issue running from spark-shell, then realized that I needed sqlContext.implicit._ Now my code has the following imports /* import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new SQLContext(sc) import sqlContext.implicits._ */ Code works fine from spark-shell REPL. It also runs fine when run in local mode from Eclipse. I get this error only when I submit to cluster using spark-submit. bin/spark-submit /local/weblog-analysis_2.11-1.0.jar --class WebLogAnalysis --master spark://machu:7077 I'm testing with spark 1.4. My code was built using scala 2.11 and spark+sparkSQL 1.4.0 as dependency in build.sbt Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38) at WebLogAnalysis$.main(WebLogAnalysis.scala:62) at WebLogAnalysis.main(WebLogAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I can provide more code or log if that will help. Let me know. Srikanth -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-RDD-to-DF-fails-with-toDF-tp23499.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4 RDD to DF fails with toDF()
Thanks Akhil for checking this out. Here is my build.sbt. name := Weblog Analysis version := 1.0 scalaVersion := 2.11.5 javacOptions ++= Seq(-source, 1.7, -target, 1.7) libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.4.0 % provided, org.apache.spark %% spark-sql % 1.4.0, org.apache.spark %% spark-streaming % 1.4.0, org.apache.spark %% spark-streaming-kafka % 1.4.0, org.apache.spark %% spark-mllib % 1.4.0, org.apache.commons % commons-lang3 % 3.0, org.eclipse.jetty % jetty-client % 8.1.14.v20131031, org.scalatest %% scalatest % 2.2.1 % test, com.databricks % spark-csv_2.11 % 1.0.3, joda-time % joda-time % 2.8.1, org.joda % joda-convert % 1.7 ) resolvers ++= Seq( Sonatype OSS Snapshots at http://oss.sonatype.org/content/repositories/snapshots/;, Sonatype public at http://oss.sonatype.org/content/groups/public/;, Sonatype at http://nexus.scala-tools.org/content/repositories/public;, Scala Tools at http://scala-tools.org/repo-snapshots/;, Typesafeat http://repo.typesafe.com/typesafe/releases/ , Akka at http://akka.io/repository/;, JBoss at http://repository.jboss.org/nexus/content/groups/public/;, GuiceyFruit at http://guiceyfruit.googlecode.com/svn/repo/releases/; ) On Fri, Jun 26, 2015 at 4:13 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Its a scala version conflict, can you paste your build.sbt file? Thanks Best Regards On Fri, Jun 26, 2015 at 7:05 AM, stati srikanth...@gmail.com wrote: Hello, When I run a spark job with spark-submit it fails with below exception for code line /*val webLogDF = webLogRec.toDF().select(ip, date, name)*/ I had similar issue running from spark-shell, then realized that I needed sqlContext.implicit._ Now my code has the following imports /* import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new SQLContext(sc) import sqlContext.implicits._ */ Code works fine from spark-shell REPL. It also runs fine when run in local mode from Eclipse. I get this error only when I submit to cluster using spark-submit. bin/spark-submit /local/weblog-analysis_2.11-1.0.jar --class WebLogAnalysis --master spark://machu:7077 I'm testing with spark 1.4. My code was built using scala 2.11 and spark+sparkSQL 1.4.0 as dependency in build.sbt Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38) at WebLogAnalysis$.main(WebLogAnalysis.scala:62) at WebLogAnalysis.main(WebLogAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I can provide more code or log if that will help. Let me know. Srikanth -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-RDD-to-DF-fails-with-toDF-tp23499.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.OutOfMemoryError: PermGen space
That worked. Thanks! I wonder what changed in 1.4 to cause this. It wouldn't work with anything less than 256m for a simple piece of code. 1.3.1 used to work with default(64m I think) Srikanth On Wed, Jun 24, 2015 at 12:47 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Did you try to pass it with --driver-java-options -XX:MaxPermSize=256m as spark-shell input argument? Roberto On Wed, Jun 24, 2015 at 5:57 PM, stati srikanth...@gmail.com wrote: Hello, I moved from 1.3.1 to 1.4.0 and started receiving java.lang.OutOfMemoryError: PermGen space when I use spark-shell. Same Scala code works fine in 1.3.1 spark-shell. I was loading same set of external JARs and have same imports in 1.3.1. I tried increasing perm size to 256m. I still got OOM. /SPARK_REPL_OPTS=-XX:MaxPermSize=256m bin/spark-shell --master spark://machu:7077 --total-executor-cores 12 --packages com.databricks:spark-csv_2.10:1.0.3 --packages joda-time:joda-time:2.8.1 / Spark UI Environment tab didn't show -XX:MaxPermSize. I'm not sure if this config was picked up. This is standalone mode. Any pointers to next step? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-PermGen-space-tp23472.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org