I'll try dropping the maxRatePerPartition=400, or maybe even lower.
However even at application starts up I have this large scheduling
delay. I will report my progress later on.

On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <c...@koeninger.org> wrote:
> If your batch time is 1 second and your average processing time is
> 1.16 seconds, you're always going to be falling behind.  That would
> explain why you've built up an hour of scheduling delay after eight
> hours of running.
>
> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <disc...@uw.edu> 
> wrote:
>> Hi Mich again,
>>
>> Regarding batch window, etc. I have provided the sources, but I'm not
>> currently calling the window function. Did you see the program source?
>> It's only 100 lines.
>>
>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>
>> Then I would expect I'm using defaults, other than what has been shown
>> in the configuration.
>>
>> For example:
>>
>> In the launcher configuration I set --conf
>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>> are 500 messages for the duration set in the application:
>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>> Duration(1000));
>>
>>
>> Then with the --num-executors 6 \ submit flag, and the
>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>> arrive at the 3000 events per batch in the UI, pasted above.
>>
>> Feel free to correct me if I'm wrong.
>>
>> Then are you suggesting that I set the window?
>>
>> Maybe following this as reference:
>>
>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>
>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>> <mich.talebza...@gmail.com> wrote:
>>> Ok
>>>
>>> What is the set up for these please?
>>>
>>> batch window
>>> window length
>>> sliding interval
>>>
>>> And also in each batch window how much data do you get in (no of messages in
>>> the topic whatever)?
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>>
>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
>>>>
>>>> I believe you have an issue with performance?
>>>>
>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>> etc?
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>>
>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> wrote:
>>>>>
>>>>> There are 25 nodes in the spark cluster.
>>>>>
>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>> <mich.talebza...@gmail.com> wrote:
>>>>> > how many nodes are in your cluster?
>>>>> >
>>>>> > --num-executors 6 \
>>>>> >  --driver-memory 4G \
>>>>> >  --executor-memory 2G \
>>>>> >  --total-executor-cores 12 \
>>>>> >
>>>>> >
>>>>> > Dr Mich Talebzadeh
>>>>> >
>>>>> >
>>>>> >
>>>>> > LinkedIn
>>>>> >
>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> >
>>>>> >
>>>>> >
>>>>> > http://talebzadehmich.wordpress.com
>>>>> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu>
>>>>> > wrote:
>>>>> >>
>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>> >> as described in this thread. I was away from this project for awhile
>>>>> >> due to events in my family.
>>>>> >>
>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>> >> some details below, including the script I use to launch the
>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>> >> with my launch method that could be causing the delay, related to the
>>>>> >> included jars?
>>>>> >>
>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>> >> for the application?
>>>>> >>
>>>>> >> Any advice is appriciated.
>>>>> >>
>>>>> >>
>>>>> >> The application:
>>>>> >>
>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>> >>
>>>>> >>
>>>>> >> From the streaming UI I get something like:
>>>>> >>
>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>> >>
>>>>> >>
>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>> >>
>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>> >>
>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>> >>
>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>> >>
>>>>> >>
>>>>> >> Here's how I'm launching the spark application.
>>>>> >>
>>>>> >>
>>>>> >> #!/usr/bin/env bash
>>>>> >>
>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>> >>
>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>> >>
>>>>> >> export
>>>>> >>
>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>> >>
>>>>> >>
>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>> >>
>>>>> >> --class com.example.KafkaToHbase \
>>>>> >>
>>>>> >> --master spark://spark_master:7077 \
>>>>> >>
>>>>> >> --deploy-mode client \
>>>>> >>
>>>>> >> --num-executors 6 \
>>>>> >>
>>>>> >> --driver-memory 4G \
>>>>> >>
>>>>> >> --executor-memory 2G \
>>>>> >>
>>>>> >> --total-executor-cores 12 \
>>>>> >>
>>>>> >> --jars
>>>>> >>
>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>> >> \
>>>>> >>
>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>> >>
>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>> >>
>>>>> >> --conf spark.eventLog.enabled=false \
>>>>> >>
>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>> >>
>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>> >>
>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>> >>
>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>> >>
>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>> >>
>>>>> >> --driver-java-options
>>>>> >>
>>>>> >>
>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>> >> \
>>>>> >>
>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>> >> "broker1:9092,broker2:9092"
>>>>> >>
>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>> >> <disc...@uw.edu>
>>>>> >> wrote:
>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>> >> > Then I'm in the process of using the direct api.
>>>>> >> >
>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org>
>>>>> >> > wrote:
>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>> >> >>
>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>> >> >> <disc...@uw.edu>
>>>>> >> >> wrote:
>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>> >> >>> my
>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>> >> >>> entries
>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>> >> >>>
>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>> >> >>> jars
>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>> >> >>> without
>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>> >> >>> Kafka
>>>>> >> >>> if it causes big performance issues.
>>>>> >> >>>
>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>> >> >>> <c...@koeninger.org>
>>>>> >> >>> wrote:
>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>> >> >>>> calls
>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>> >> >>>>
>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>> >> >>>> large
>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>> >> >>>> of
>>>>> >> >>>> topicpartitions, etc)?
>>>>> >> >>>>
>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>> >> >>>> streams, or
>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>> >> >>>>
>>>>> >> >>>>
>>>>> >> >>>>
>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>> >> >>>> <disc...@uw.edu> wrote:
>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>> >> >>>>> archives
>>>>> >> >>>>> but
>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>> >> >>>>> to
>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>> >> >>>>> Streaming
>>>>> >> >>>>> tab
>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>> >> >>>>> "functionality"
>>>>> >> >>>>> except:
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>> >> >>>>> KafkaUtils
>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>> >> >>>>> kafkaParams,
>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>> >> >>>>>
>>>>> >> >>>>>        dstream.print();
>>>>> >> >>>>>
>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>> >> >>>>> around
>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>> >> >>>>> /
>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>> >> >>>>> me,
>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>> >> >>>>> using
>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>> >> >>>>> amount of reads.
>>>>> >> >>>>>
>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>> >> >>>>> expected
>>>>> >> >>>>> way
>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>> >> >>>>> wrong?
>>>>> >> >>>>>
>>>>> >> >>>>> My application looks like
>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>> >> >>>>>
>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>> >> >>>>> <c...@koeninger.org>
>>>>> >> >>>>> wrote:
>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>> >> >>>>>> just
>>>>> >> >>>>>> deserialize)?
>>>>> >> >>>>>>
>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>> >> >>>>>> The
>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>> >> >>>>>> stuck
>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>> >> >>>>>> search
>>>>> >> >>>>>> the mailing list archives.
>>>>> >> >>>>>>
>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>> >> >>>>>> <disc...@uw.edu> wrote:
>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>> >> >>>>>>> with
>>>>> >> >>>>>>> 1.7
>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>> >> >>>>>>> insert
>>>>> >> >>>>>>> into
>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>> >> >>>>>>> 1.2.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>> >> >>>>>>> between
>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>> >> >>>>>>> consume
>>>>> >> >>>>>>> the
>>>>> >> >>>>>>> entries.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>> >> >>>>>>> bottleneck.
>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>> >> >>>>>>> didn't
>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>> >> >>>>>>> different
>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>> >> >>>>>>> performance.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>> >> >>>>>>> kafka
>>>>> >> >>>>>>> using
>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>> >> >>>>>>> like to
>>>>> >> >>>>>>> at least get 10%.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> My application looks like
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>> >> >>>>>>> assistance.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>>
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> >> >>>>>>>
>>>>> >> >>>>
>>>>> >> >>>>
>>>>> >> >>>> ---------------------------------------------------------------------
>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> >> >>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> >> >>>>
>>>>> >>
>>>>> >> ---------------------------------------------------------------------
>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>>>> >>
>>>>> >
>>>>
>>>>
>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to