I updated my app to Spark 1.5.2 streaming so that it consumes from
Kafka using the direct api and inserts content into an hbase cluster,
as described in this thread. I was away from this project for awhile
due to events in my family.

Currently my scheduling delay is high, but the processing time is
stable around a second. I changed my setup to use 6 kafka partitions
on a set of smaller kafka brokers, with fewer disks. I've included
some details below, including the script I use to launch the
application. I'm using a Spark on Hbase library, whose version is
relevant to my Hbase cluster. Is it apparent there is something wrong
with my launch method that could be causing the delay, related to the
included jars?

Or is there something wrong with the very simple approach I'm taking
for the application?

Any advice is appriciated.


The application:

https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877


>From the streaming UI I get something like:

table Completed Batches (last 1000 out of 27136)


Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
Delay (?) Output Ops: Succeeded/Total

2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1

2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1

2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1


Here's how I'm launching the spark application.


#!/usr/bin/env bash

export SPARK_CONF_DIR=/home/colin.williams/spark

export HADOOP_CONF_DIR=/etc/hadoop/conf

export 
HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar


/opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \

--class com.example.KafkaToHbase \

--master spark://spark_master:7077 \

--deploy-mode client \

--num-executors 6 \

--driver-memory 4G \

--executor-memory 2G \

--total-executor-cores 12 \

--jars 
/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
\

--conf spark.app.name="Kafka To Hbase" \

--conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \

--conf spark.eventLog.enabled=false \

--conf spark.eventLog.overwrite=true \

--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \

--conf spark.streaming.backpressure.enabled=false \

--conf spark.streaming.kafka.maxRatePerPartition=500 \

--driver-class-path /home/colin.williams/kafka-hbase.jar \

--driver-java-options
-Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
\

/home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
"broker1:9092,broker2:9092"

On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams <disc...@uw.edu> wrote:
> Thanks Cody, I can see that the partitions are well distributed...
> Then I'm in the process of using the direct api.
>
> On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org> wrote:
>> 60 partitions in and of itself shouldn't be a big performance issue
>> (as long as producers are distributing across partitions evenly).
>>
>> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams <disc...@uw.edu> 
>> wrote:
>>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>> issue with the receiver was the large number of partitions. I had
>>> miscounted the disks and so 11*3*2 is how I decided to partition my
>>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>> attempt ) . This worked well enough for me, I put 1.7 billion entries
>>> into Kafka on a map reduce job in 5 and a half hours.
>>>
>>> I was concerned using spark 1.5.2 because I'm currently putting my
>>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library jars
>>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>> yesterday, I tried building against 1.5.2. So far it's running without
>>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>> improvement using the same code, but I'll see how the direct api
>>> handles it. In the end I can reduce the number of partitions in Kafka
>>> if it causes big performance issues.
>>>
>>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>>> print() isn't really the best way to benchmark things, since it calls
>>>> take(10) under the covers, but 380 records / second for a single
>>>> receiver doesn't sound right in any case.
>>>>
>>>> Am I understanding correctly that you're trying to process a large
>>>> number of already-existing kafka messages, not keep up with an
>>>> incoming stream?  Can you give any details (e.g. hardware, number of
>>>> topicpartitions, etc)?
>>>>
>>>> Really though, I'd try to start with spark 1.6 and direct streams, or
>>>> even just kafkacat, as a baseline.
>>>>
>>>>
>>>>
>>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams <disc...@uw.edu> 
>>>> wrote:
>>>>> Hello again. I searched for "backport kafka" in the list archives but
>>>>> couldn't find anything but a post from Spark 0.7.2 . I was going to
>>>>> use accumulators to make a counter, but then saw on the Streaming tab
>>>>> the Receiver Statistics. Then I removed all other "functionality"
>>>>> except:
>>>>>
>>>>>
>>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream = KafkaUtils
>>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>> Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams,
>>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>> kafka.serializer.DefaultDecoder.class,
>>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>
>>>>>        dstream.print();
>>>>>
>>>>> Then in the Recieiver Stats for the single receiver, I'm seeing around
>>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>> above, I'd need to run around 21 receivers, assuming 380 records /
>>>>> second, just using the print output. This seems awfully high to me,
>>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>> mapreduce job, and that my bottleneck was likely Hbase. Again using
>>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>> amount of reads.
>>>>>
>>>>> Even given the issues with the 1.2 receivers, is this the expected way
>>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>> wrong?
>>>>>
>>>>> My application looks like
>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>
>>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>>>>> Have you tested for read throughput (without writing to hbase, just
>>>>>> deserialize)?
>>>>>>
>>>>>> Are you limited to using spark 1.2, or is upgrading possible?  The
>>>>>> kafka direct stream is available starting with 1.3.  If you're stuck
>>>>>> on 1.2, I believe there have been some attempts to backport it, search
>>>>>> the mailing list archives.
>>>>>>
>>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams <disc...@uw.edu> 
>>>>>> wrote:
>>>>>>> I've written an application to get content from a kafka topic with 1.7
>>>>>>> billion entries,  get the protobuf serialized entries, and insert into
>>>>>>> hbase. Currently the environment that I'm running in is Spark 1.2.
>>>>>>>
>>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting between
>>>>>>> 0-2500 writes / second. This will take much too long to consume the
>>>>>>> entries.
>>>>>>>
>>>>>>> I currently believe that the spark kafka receiver is the bottleneck.
>>>>>>> I've tried both 1.2 receivers, with the WAL and without, and didn't
>>>>>>> notice any large performance difference. I've tried many different
>>>>>>> spark configuration options, but can't seem to get better performance.
>>>>>>>
>>>>>>> I saw 80000 requests / second inserting these records into kafka using
>>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>>>
>>>>>>> While hbase inserts might not deliver the same throughput, I'd like to
>>>>>>> at least get 10%.
>>>>>>>
>>>>>>> My application looks like
>>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>>>
>>>>>>> This is my first spark application. I'd appreciate any assistance.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to