I'll try dropping the maxRatePerPartition=400, or maybe even lower. However even at application starts up I have this large scheduling delay. I will report my progress later on.
On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <c...@koeninger.org> wrote: > If your batch time is 1 second and your average processing time is > 1.16 seconds, you're always going to be falling behind. That would > explain why you've built up an hour of scheduling delay after eight > hours of running. > > On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <disc...@uw.edu> > wrote: >> Hi Mich again, >> >> Regarding batch window, etc. I have provided the sources, but I'm not >> currently calling the window function. Did you see the program source? >> It's only 100 lines. >> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >> >> Then I would expect I'm using defaults, other than what has been shown >> in the configuration. >> >> For example: >> >> In the launcher configuration I set --conf >> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there >> are 500 messages for the duration set in the application: >> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new >> Duration(1000)); >> >> >> Then with the --num-executors 6 \ submit flag, and the >> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we >> arrive at the 3000 events per batch in the UI, pasted above. >> >> Feel free to correct me if I'm wrong. >> >> Then are you suggesting that I set the window? >> >> Maybe following this as reference: >> >> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html >> >> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh >> <mich.talebza...@gmail.com> wrote: >>> Ok >>> >>> What is the set up for these please? >>> >>> batch window >>> window length >>> sliding interval >>> >>> And also in each batch window how much data do you get in (no of messages in >>> the topic whatever)? >>> >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> >>> On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: >>>> >>>> I believe you have an issue with performance? >>>> >>>> have you checked spark GUI (default 4040) for details including shuffles >>>> etc? >>>> >>>> HTH >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> >>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> wrote: >>>>> >>>>> There are 25 nodes in the spark cluster. >>>>> >>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh >>>>> <mich.talebza...@gmail.com> wrote: >>>>> > how many nodes are in your cluster? >>>>> > >>>>> > --num-executors 6 \ >>>>> > --driver-memory 4G \ >>>>> > --executor-memory 2G \ >>>>> > --total-executor-cores 12 \ >>>>> > >>>>> > >>>>> > Dr Mich Talebzadeh >>>>> > >>>>> > >>>>> > >>>>> > LinkedIn >>>>> > >>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> > >>>>> > >>>>> > >>>>> > http://talebzadehmich.wordpress.com >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu> >>>>> > wrote: >>>>> >> >>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from >>>>> >> Kafka using the direct api and inserts content into an hbase cluster, >>>>> >> as described in this thread. I was away from this project for awhile >>>>> >> due to events in my family. >>>>> >> >>>>> >> Currently my scheduling delay is high, but the processing time is >>>>> >> stable around a second. I changed my setup to use 6 kafka partitions >>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included >>>>> >> some details below, including the script I use to launch the >>>>> >> application. I'm using a Spark on Hbase library, whose version is >>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong >>>>> >> with my launch method that could be causing the delay, related to the >>>>> >> included jars? >>>>> >> >>>>> >> Or is there something wrong with the very simple approach I'm taking >>>>> >> for the application? >>>>> >> >>>>> >> Any advice is appriciated. >>>>> >> >>>>> >> >>>>> >> The application: >>>>> >> >>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>> >> >>>>> >> >>>>> >> From the streaming UI I get something like: >>>>> >> >>>>> >> table Completed Batches (last 1000 out of 27136) >>>>> >> >>>>> >> >>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total >>>>> >> Delay (?) Output Ops: Succeeded/Total >>>>> >> >>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>> >> >>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>> >> >>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>> >> >>>>> >> >>>>> >> Here's how I'm launching the spark application. >>>>> >> >>>>> >> >>>>> >> #!/usr/bin/env bash >>>>> >> >>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark >>>>> >> >>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf >>>>> >> >>>>> >> export >>>>> >> >>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar >>>>> >> >>>>> >> >>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \ >>>>> >> >>>>> >> --class com.example.KafkaToHbase \ >>>>> >> >>>>> >> --master spark://spark_master:7077 \ >>>>> >> >>>>> >> --deploy-mode client \ >>>>> >> >>>>> >> --num-executors 6 \ >>>>> >> >>>>> >> --driver-memory 4G \ >>>>> >> >>>>> >> --executor-memory 2G \ >>>>> >> >>>>> >> --total-executor-cores 12 \ >>>>> >> >>>>> >> --jars >>>>> >> >>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar >>>>> >> \ >>>>> >> >>>>> >> --conf spark.app.name="Kafka To Hbase" \ >>>>> >> >>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \ >>>>> >> >>>>> >> --conf spark.eventLog.enabled=false \ >>>>> >> >>>>> >> --conf spark.eventLog.overwrite=true \ >>>>> >> >>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ >>>>> >> >>>>> >> --conf spark.streaming.backpressure.enabled=false \ >>>>> >> >>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \ >>>>> >> >>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \ >>>>> >> >>>>> >> --driver-java-options >>>>> >> >>>>> >> >>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/* >>>>> >> \ >>>>> >> >>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable" >>>>> >> "broker1:9092,broker2:9092" >>>>> >> >>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams >>>>> >> <disc...@uw.edu> >>>>> >> wrote: >>>>> >> > Thanks Cody, I can see that the partitions are well distributed... >>>>> >> > Then I'm in the process of using the direct api. >>>>> >> > >>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org> >>>>> >> > wrote: >>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue >>>>> >> >> (as long as producers are distributing across partitions evenly). >>>>> >> >> >>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams >>>>> >> >> <disc...@uw.edu> >>>>> >> >> wrote: >>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3 >>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the >>>>> >> >>> issue with the receiver was the large number of partitions. I had >>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition >>>>> >> >>> my >>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first >>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion >>>>> >> >>> entries >>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours. >>>>> >> >>> >>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my >>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library >>>>> >> >>> jars >>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit >>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running >>>>> >> >>> without >>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much >>>>> >> >>> improvement using the same code, but I'll see how the direct api >>>>> >> >>> handles it. In the end I can reduce the number of partitions in >>>>> >> >>> Kafka >>>>> >> >>> if it causes big performance issues. >>>>> >> >>> >>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger >>>>> >> >>> <c...@koeninger.org> >>>>> >> >>> wrote: >>>>> >> >>>> print() isn't really the best way to benchmark things, since it >>>>> >> >>>> calls >>>>> >> >>>> take(10) under the covers, but 380 records / second for a single >>>>> >> >>>> receiver doesn't sound right in any case. >>>>> >> >>>> >>>>> >> >>>> Am I understanding correctly that you're trying to process a >>>>> >> >>>> large >>>>> >> >>>> number of already-existing kafka messages, not keep up with an >>>>> >> >>>> incoming stream? Can you give any details (e.g. hardware, number >>>>> >> >>>> of >>>>> >> >>>> topicpartitions, etc)? >>>>> >> >>>> >>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct >>>>> >> >>>> streams, or >>>>> >> >>>> even just kafkacat, as a baseline. >>>>> >> >>>> >>>>> >> >>>> >>>>> >> >>>> >>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams >>>>> >> >>>> <disc...@uw.edu> wrote: >>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list >>>>> >> >>>>> archives >>>>> >> >>>>> but >>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going >>>>> >> >>>>> to >>>>> >> >>>>> use accumulators to make a counter, but then saw on the >>>>> >> >>>>> Streaming >>>>> >> >>>>> tab >>>>> >> >>>>> the Receiver Statistics. Then I removed all other >>>>> >> >>>>> "functionality" >>>>> >> >>>>> except: >>>>> >> >>>>> >>>>> >> >>>>> >>>>> >> >>>>> JavaPairReceiverInputDStream<byte[], byte[]> dstream = >>>>> >> >>>>> KafkaUtils >>>>> >> >>>>> //createStream(JavaStreamingContext jssc,Class<K> >>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass, >>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String> >>>>> >> >>>>> kafkaParams, >>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel) >>>>> >> >>>>> .createStream(jssc, byte[].class, byte[].class, >>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, >>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap, >>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER()); >>>>> >> >>>>> >>>>> >> >>>>> dstream.print(); >>>>> >> >>>>> >>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing >>>>> >> >>>>> around >>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned >>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records >>>>> >> >>>>> / >>>>> >> >>>>> second, just using the print output. This seems awfully high to >>>>> >> >>>>> me, >>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a >>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again >>>>> >> >>>>> using >>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar >>>>> >> >>>>> amount of reads. >>>>> >> >>>>> >>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the >>>>> >> >>>>> expected >>>>> >> >>>>> way >>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly >>>>> >> >>>>> wrong? >>>>> >> >>>>> >>>>> >> >>>>> My application looks like >>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>> >> >>>>> >>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger >>>>> >> >>>>> <c...@koeninger.org> >>>>> >> >>>>> wrote: >>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase, >>>>> >> >>>>>> just >>>>> >> >>>>>> deserialize)? >>>>> >> >>>>>> >>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible? >>>>> >> >>>>>> The >>>>> >> >>>>>> kafka direct stream is available starting with 1.3. If you're >>>>> >> >>>>>> stuck >>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it, >>>>> >> >>>>>> search >>>>> >> >>>>>> the mailing list archives. >>>>> >> >>>>>> >>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams >>>>> >> >>>>>> <disc...@uw.edu> wrote: >>>>> >> >>>>>>> I've written an application to get content from a kafka topic >>>>> >> >>>>>>> with >>>>> >> >>>>>>> 1.7 >>>>> >> >>>>>>> billion entries, get the protobuf serialized entries, and >>>>> >> >>>>>>> insert >>>>> >> >>>>>>> into >>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark >>>>> >> >>>>>>> 1.2. >>>>> >> >>>>>>> >>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting >>>>> >> >>>>>>> between >>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to >>>>> >> >>>>>>> consume >>>>> >> >>>>>>> the >>>>> >> >>>>>>> entries. >>>>> >> >>>>>>> >>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the >>>>> >> >>>>>>> bottleneck. >>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and >>>>> >> >>>>>>> didn't >>>>> >> >>>>>>> notice any large performance difference. I've tried many >>>>> >> >>>>>>> different >>>>> >> >>>>>>> spark configuration options, but can't seem to get better >>>>> >> >>>>>>> performance. >>>>> >> >>>>>>> >>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into >>>>> >> >>>>>>> kafka >>>>> >> >>>>>>> using >>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion. >>>>> >> >>>>>>> >>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd >>>>> >> >>>>>>> like to >>>>> >> >>>>>>> at least get 10%. >>>>> >> >>>>>>> >>>>> >> >>>>>>> My application looks like >>>>> >> >>>>>>> >>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>> >> >>>>>>> >>>>> >> >>>>>>> This is my first spark application. I'd appreciate any >>>>> >> >>>>>>> assistance. >>>>> >> >>>>>>> >>>>> >> >>>>>>> >>>>> >> >>>>>>> >>>>> >> >>>>>>> --------------------------------------------------------------------- >>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >> >>>>>>> >>>>> >> >>>> >>>>> >> >>>> >>>>> >> >>>> --------------------------------------------------------------------- >>>>> >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> >> >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >> >>>> >>>>> >> >>>>> >> --------------------------------------------------------------------- >>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> >> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >> >>>>> > >>>> >>>> >>> --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org