Dibyendu - I am using the Kafka consumer built into Spark streaming. Pulled the jar from here: http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka_2.10/1.0.0/spark-streaming-kafka_2.10-1.0.0.jar
Thanks for the sbt-assembly link, Soumitra. On Wed, Sep 17, 2014 at 5:50 PM, Dibyendu Bhattacharya <dibyendu.bhattach...@gmail.com> wrote: > Hi Tim > > Just curious to know ; Which Kafka Consumer you have used ? > > Dib > > On Sep 18, 2014 4:40 AM, "Tim Smith" <secs...@gmail.com> wrote: >> >> Thanks :) >> >> On Wed, Sep 17, 2014 at 2:10 PM, Paul Wais <pw...@yelp.com> wrote: >> > Thanks Tim, this is super helpful! >> > >> > Question about jars and spark-submit: why do you provide >> > myawesomeapp.jar as the program jar but then include other jars via >> > the --jars argument? Have you tried building one uber jar with all >> > dependencies and just sending that to Spark as your app jar? >> >> I guess that is mostly because I am Scala/sbt noob :) How do I create >> the uber jar? My .sbt file says: >> name := "My Awesome App" >> version := "1.025" >> scalaVersion := "2.10.4" >> resolvers += "Apache repo" at >> "https://repository.apache.org/content/repositories/releases" >> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" >> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0" >> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % >> "1.0.0" >> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.1.1" >> >> Then I run "sbt package" to generate myawesomeapp.jar. >> >> > >> > Also, have you ever seen any issues with Spark caching your app jar >> > between runs even if it changes? >> >> Not that I can tell but then maybe because I use Yarn, I might be >> shielded from some jar distribution bugs in Spark? >> >> >> > >> > On Wed, Sep 17, 2014 at 1:11 PM, Tim Smith <secs...@gmail.com> wrote: >> >> I don't have anything in production yet but I now at least have a >> >> stable (running for more than 24 hours) streaming app. Earlier, the >> >> app would crash for all sorts of reasons. Caveats/setup: >> >> - Spark 1.0.0 (I have no input flow control unlike Spark 1.1) >> >> - Yarn for RM >> >> - Input and Output to Kafka >> >> - CDH 5.1 >> >> - 11 node cluster with 32-cores and 48G max container size for each >> >> node (Yarn managed) >> >> - 5 partition Kafka topic - both in and out >> >> - Roughly, an average of 25k messages per second >> >> - App written in Scala (warning: I am a Scala noob) >> >> >> >> Few things I had to add/tweak to get the app to be stable: >> >> - The executor JVMs did not have any GC options set, by default. This >> >> might be more of a CDH issue. I noticed that while the Yarn container >> >> and other Spark ancillary tasks had GC options set at launch but none >> >> for the executors. So I played with different GC options and this >> >> worked best: >> >> SPARK_JAVA_OPTS="-XX:MaxPermSize=512m -XX:NewSize=1024m >> >> -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 >> >> -XX:+AggressiveHeap -XX:MaxHeapFreeRatio=70 -verbosegc >> >> -XX:+PrintGCDetails" >> >> >> >> I tried G1GC but for some reason it just didn't work. I am not a Java >> >> programmer or expert so my conclusion is purely trial and error based. >> >> The GC logs, with these flags, go to the "stdout" file in the Yarn >> >> container logs on each node/worker. You can set SPARK_JAVA_OPTS in >> >> spark-env.sh on the driver node and Yarn will respect these. On CDH/CM >> >> specifically, even though you don't run Spark as a service (since you >> >> are using Yarn for RM), you can goto "Spark Client Advanced >> >> Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh" and >> >> set SPARK_JAVA_OPTS there. >> >> >> >> - Set these two params - "spark.yarn.executor.memoryOverhead" >> >> "spark.yarn.driver.memoryOverhead". Earlier, my app would get killed >> >> because the executors running the kafka receivers would get killed by >> >> Yarn for over utilization of memory. Now, these are my memory settings >> >> (I will paste the entire app launch params later in the email): >> >> --driver-memory 2G \ >> >> --executor-memory 16G \ >> >> --spark.yarn.executor.memoryOverhead 4096 \ >> >> --spark.yarn.driver.memoryOverhead 1024 \ >> >> >> >> Your total executor JVM will consume "executor-memory" minus >> >> "spark.yarn.executor.memoryOverhead" so you should see each executor >> >> JVM consuming no more than 12G, in this case. >> >> >> >> Here is how I launch my app: >> >> run=`date +"%m-%d-%YT%T"`; \ >> >> nohup spark-submit --class myAwesomeApp \ >> >> --master yarn myawesomeapp.jar \ >> >> --jars >> >> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar >> >> \ >> >> --driver-memory 2G \ >> >> --executor-memory 16G \ >> >> --executor-cores 16 \ >> >> --num-executors 10 \ >> >> --spark.serializer org.apache.spark.serializer.KryoSerializer \ >> >> --spark.rdd.compress true \ >> >> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec >> >> \ >> >> --spark.akka.threads 64 \ >> >> --spark.akka.frameSize 500 \ >> >> --spark.task.maxFailures 64 \ >> >> --spark.scheduler.mode FAIR \ >> >> --spark.yarn.executor.memoryOverhead 4096 \ >> >> --spark.yarn.driver.memoryOverhead 1024 \ >> >> --spark.shuffle.consolidateFiles true \ >> >> --spark.default.parallelism 528 \ >> >>>logs/normRunLog-$run.log \ >> >> 2>logs/normRunLogError-$run.log & \ >> >> echo $! > logs/current-run.pid >> >> >> >> Some code optimizations (or, goof ups that I fixed). I did not >> >> scientifically measure the impact of each but I think they helped: >> >> - Made all my classes and objects serializable and then use Kryo (as >> >> you see above) >> >> - I map one receive task for each kafka partition >> >> - Instead of doing a "union" on all the incoming streams and then >> >> repartition() I now repartition() each incoming stream and process >> >> them separately. I believe this reduces shuffle. >> >> - Reduced number of repartitions. I was doing 128 after doing a >> >> "union" on all incoming dStreams. I now repartition each of the five >> >> streams separately (in a loop) to 24. >> >> - For each RDD, I set storagelevel to "MEMORY_AND_DISK_SER" >> >> - Process data per partition instead of per RDD: dataout.foreachRDD( >> >> rdd => rdd.foreachPartition(rec => { myFunc(rec) }) ) >> >> - Specific to kafka: when I create "new Producer", make sure I "close" >> >> it else I had a ton of "too many files open" errors :) >> >> - Use immutable objects as far as possible. If I use mutable objects >> >> within a method/class then I turn them into immutable before passing >> >> onto another class/method. >> >> - For logging, create a LogService object that I then use for other >> >> object/class declarations. Once instantiated, I can make "logInfo" >> >> calls from within other Objects/Methods/Classes and output goes to the >> >> "stderr" file in the Yarn container logs. Good for debugging stream >> >> processing logic. >> >> >> >> Currently, my processing delay is lower than my dStream time window so >> >> all is good. I get a ton of these errors in my driver logs: >> >> 14/09/16 21:17:40 ERROR LiveListenerBus: Listener JobProgressListener >> >> threw an exception >> >> >> >> These seem related to: https://issues.apache.org/jira/browse/SPARK-2316 >> >> >> >> Best I understand and have been told, this does not affect data >> >> integrity but may cause un-necessary recomputes. >> >> >> >> Hope this helps, >> >> >> >> Tim >> >> >> >> >> >> On Wed, Sep 17, 2014 at 8:30 AM, Soumitra Kumar >> >> <kumar.soumi...@gmail.com> wrote: >> >>> Hmm, no response to this thread! >> >>> >> >>> Adding to it, please share experiences of building an enterprise grade >> >>> product based on Spark Streaming. >> >>> >> >>> I am exploring Spark Streaming for enterprise software and am >> >>> cautiously optimistic about it. I see huge potential to improve >> >>> debuggability of Spark. >> >>> >> >>> ----- Original Message ----- >> >>> From: "Tim Smith" <secs...@gmail.com> >> >>> To: "spark users" <user@spark.apache.org> >> >>> Sent: Friday, September 12, 2014 10:09:53 AM >> >>> Subject: Stable spark streaming app >> >>> >> >>> Hi, >> >>> >> >>> Anyone have a stable streaming app running in "production"? Can you >> >>> share some overview of the app and setup like number of nodes, events >> >>> per second, broad stream processing workflow, config highlights etc? >> >>> >> >>> Thanks, >> >>> >> >>> Tim >> >>> >> >>> --------------------------------------------------------------------- >> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >>> For additional commands, e-mail: user-h...@spark.apache.org >> >>> >> >> >> >> --------------------------------------------------------------------- >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org