Re: Kafka Spark structured streaming latency benchmark.

2017-01-02 Thread Prashant Sharma
This issue was fixed in https://issues.apache.org/jira/browse/SPARK-18991.

--Prashant


On Tue, Dec 20, 2016 at 6:16 PM, Prashant Sharma 
wrote:

> Hi Shixiong,
>
> Thanks for taking a look, I am trying to run and see if making
> ContextCleaner run more frequently and/or making it non blocking will help.
>
> --Prashant
>
>
> On Tue, Dec 20, 2016 at 4:05 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Prashant. Thanks for your codes. I did some investigation and it
>> turned out that ContextCleaner is too slow and its "referenceQueue" keeps
>> growing. My hunch is cleaning broadcast is very slow since it's a blocking
>> call.
>>
>> On Mon, Dec 19, 2016 at 12:50 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey, Prashant. Could you track the GC root of byte arrays in the heap?
>>>
>>> On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma 
>>> wrote:
>>>
 Furthermore, I ran the same thing with 26 GB as the memory, which would
 mean 1.3GB per thread of memory. My jmap
 
 results and jstat
 
 results collected after running the job for more than 11h, again show a
 memory constraint. The same gradual slowdown, but a bit more gradual as
 memory is considerably more than the previous runs.




 This situation sounds like a memory leak ? As the byte array objects
 are more than 13GB, and are not garbage collected.

 --Prashant


 On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
 wrote:

> Hi,
>
> Goal of my benchmark is to arrive at end to end latency lower than
> 100ms and sustain them over time, by consuming from a kafka topic and
> writing back to another kafka topic using Spark. Since the job does not do
> aggregation and does a constant time processing on each message, it
> appeared to me as an achievable target. But, then there are some 
> surprising
> and interesting pattern to observe.
>
>  Basically, it has four components namely,
> 1) kafka
> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with
> each message of about 1KB.
> 3) Spark  job subscribed to `test` topic and writes out to another
> topic `output`.
> 4) A Kafka consumer, reading from the `output` topic.
>
> How the latency was measured ?
>
> While sending messages from kafka producer, each message is embedded
> the timestamp at which it is pushed to the kafka `test` topic. Spark
> receives each message and writes them out to `output` topic as is. When
> these messages arrive at Kafka consumer, their embedded time is subtracted
> from the time of arrival at the consumer and a scatter plot of the same is
> attached.
>
> The scatter plots sample only 10 minutes of data received during
> initial one hour and then again 10 minutes of data received after 2 hours
> of run.
>
>
>
> These plots indicate a significant slowdown in latency, in the later
> scatter plot indicate almost all the messages were received with a delay
> larger than 2 seconds. However, first plot show that most messages arrived
> in less than 100ms latency. The two samples were taken with time 
> difference
> of 2 hours approx.
>
> After running the test for 24 hours, the jstat
> 
> and jmap
> 
>  output
> for the jobs indicate possibility  of memory constrains. To be more clear,
> job was run with local[20] and memory of 5GB(spark.driver.memory). The job
> is straight forward and located here: https://github.com/ScrapCodes/
> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
> es/kafka/SparkSQLKafkaConsumer.scala .
>
>
> What is causing the gradual slowdown? I need help in diagnosing the
> problem.
>
> Thanks,
>
> --Prashant
>
>

>>>
>>
>


Re: Kafka Spark structured streaming latency benchmark.

2016-12-20 Thread Prashant Sharma
Hi Shixiong,

Thanks for taking a look, I am trying to run and see if making
ContextCleaner run more frequently and/or making it non blocking will help.

--Prashant


On Tue, Dec 20, 2016 at 4:05 AM, Shixiong(Ryan) Zhu  wrote:

> Hey Prashant. Thanks for your codes. I did some investigation and it
> turned out that ContextCleaner is too slow and its "referenceQueue" keeps
> growing. My hunch is cleaning broadcast is very slow since it's a blocking
> call.
>
> On Mon, Dec 19, 2016 at 12:50 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey, Prashant. Could you track the GC root of byte arrays in the heap?
>>
>> On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma 
>> wrote:
>>
>>> Furthermore, I ran the same thing with 26 GB as the memory, which would
>>> mean 1.3GB per thread of memory. My jmap
>>> 
>>> results and jstat
>>> 
>>> results collected after running the job for more than 11h, again show a
>>> memory constraint. The same gradual slowdown, but a bit more gradual as
>>> memory is considerably more than the previous runs.
>>>
>>>
>>>
>>>
>>> This situation sounds like a memory leak ? As the byte array objects are
>>> more than 13GB, and are not garbage collected.
>>>
>>> --Prashant
>>>
>>>
>>> On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
>>> wrote:
>>>
 Hi,

 Goal of my benchmark is to arrive at end to end latency lower than
 100ms and sustain them over time, by consuming from a kafka topic and
 writing back to another kafka topic using Spark. Since the job does not do
 aggregation and does a constant time processing on each message, it
 appeared to me as an achievable target. But, then there are some surprising
 and interesting pattern to observe.

  Basically, it has four components namely,
 1) kafka
 2) Long running kafka producer, rate limited to 1000 msgs/sec, with
 each message of about 1KB.
 3) Spark  job subscribed to `test` topic and writes out to another
 topic `output`.
 4) A Kafka consumer, reading from the `output` topic.

 How the latency was measured ?

 While sending messages from kafka producer, each message is embedded
 the timestamp at which it is pushed to the kafka `test` topic. Spark
 receives each message and writes them out to `output` topic as is. When
 these messages arrive at Kafka consumer, their embedded time is subtracted
 from the time of arrival at the consumer and a scatter plot of the same is
 attached.

 The scatter plots sample only 10 minutes of data received during
 initial one hour and then again 10 minutes of data received after 2 hours
 of run.



 These plots indicate a significant slowdown in latency, in the later
 scatter plot indicate almost all the messages were received with a delay
 larger than 2 seconds. However, first plot show that most messages arrived
 in less than 100ms latency. The two samples were taken with time difference
 of 2 hours approx.

 After running the test for 24 hours, the jstat
 
 and jmap
 
  output
 for the jobs indicate possibility  of memory constrains. To be more clear,
 job was run with local[20] and memory of 5GB(spark.driver.memory). The job
 is straight forward and located here: https://github.com/ScrapCodes/
 KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
 es/kafka/SparkSQLKafkaConsumer.scala .


 What is causing the gradual slowdown? I need help in diagnosing the
 problem.

 Thanks,

 --Prashant


>>>
>>
>


Re: Kafka Spark structured streaming latency benchmark.

2016-12-19 Thread Shixiong(Ryan) Zhu
Hey Prashant. Thanks for your codes. I did some investigation and it turned
out that ContextCleaner is too slow and its "referenceQueue" keeps growing.
My hunch is cleaning broadcast is very slow since it's a blocking call.

On Mon, Dec 19, 2016 at 12:50 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Hey, Prashant. Could you track the GC root of byte arrays in the heap?
>
> On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma 
> wrote:
>
>> Furthermore, I ran the same thing with 26 GB as the memory, which would
>> mean 1.3GB per thread of memory. My jmap
>> 
>> results and jstat
>> 
>> results collected after running the job for more than 11h, again show a
>> memory constraint. The same gradual slowdown, but a bit more gradual as
>> memory is considerably more than the previous runs.
>>
>>
>>
>>
>> This situation sounds like a memory leak ? As the byte array objects are
>> more than 13GB, and are not garbage collected.
>>
>> --Prashant
>>
>>
>> On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
>> wrote:
>>
>>> Hi,
>>>
>>> Goal of my benchmark is to arrive at end to end latency lower than 100ms
>>> and sustain them over time, by consuming from a kafka topic and writing
>>> back to another kafka topic using Spark. Since the job does not do
>>> aggregation and does a constant time processing on each message, it
>>> appeared to me as an achievable target. But, then there are some surprising
>>> and interesting pattern to observe.
>>>
>>>  Basically, it has four components namely,
>>> 1) kafka
>>> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with each
>>> message of about 1KB.
>>> 3) Spark  job subscribed to `test` topic and writes out to another topic
>>> `output`.
>>> 4) A Kafka consumer, reading from the `output` topic.
>>>
>>> How the latency was measured ?
>>>
>>> While sending messages from kafka producer, each message is embedded the
>>> timestamp at which it is pushed to the kafka `test` topic. Spark receives
>>> each message and writes them out to `output` topic as is. When these
>>> messages arrive at Kafka consumer, their embedded time is subtracted from
>>> the time of arrival at the consumer and a scatter plot of the same is
>>> attached.
>>>
>>> The scatter plots sample only 10 minutes of data received during initial
>>> one hour and then again 10 minutes of data received after 2 hours of run.
>>>
>>>
>>>
>>> These plots indicate a significant slowdown in latency, in the later
>>> scatter plot indicate almost all the messages were received with a delay
>>> larger than 2 seconds. However, first plot show that most messages arrived
>>> in less than 100ms latency. The two samples were taken with time difference
>>> of 2 hours approx.
>>>
>>> After running the test for 24 hours, the jstat
>>> 
>>> and jmap
>>> 
>>>  output
>>> for the jobs indicate possibility  of memory constrains. To be more clear,
>>> job was run with local[20] and memory of 5GB(spark.driver.memory). The job
>>> is straight forward and located here: https://github.com/ScrapCodes/
>>> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
>>> es/kafka/SparkSQLKafkaConsumer.scala .
>>>
>>>
>>> What is causing the gradual slowdown? I need help in diagnosing the
>>> problem.
>>>
>>> Thanks,
>>>
>>> --Prashant
>>>
>>>
>>
>


Re: Kafka Spark structured streaming latency benchmark.

2016-12-19 Thread Shixiong(Ryan) Zhu
Hey, Prashant. Could you track the GC root of byte arrays in the heap?

On Sat, Dec 17, 2016 at 10:04 PM, Prashant Sharma 
wrote:

> Furthermore, I ran the same thing with 26 GB as the memory, which would
> mean 1.3GB per thread of memory. My jmap
> 
> results and jstat
> 
> results collected after running the job for more than 11h, again show a
> memory constraint. The same gradual slowdown, but a bit more gradual as
> memory is considerably more than the previous runs.
>
>
>
>
> This situation sounds like a memory leak ? As the byte array objects are
> more than 13GB, and are not garbage collected.
>
> --Prashant
>
>
> On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
> wrote:
>
>> Hi,
>>
>> Goal of my benchmark is to arrive at end to end latency lower than 100ms
>> and sustain them over time, by consuming from a kafka topic and writing
>> back to another kafka topic using Spark. Since the job does not do
>> aggregation and does a constant time processing on each message, it
>> appeared to me as an achievable target. But, then there are some surprising
>> and interesting pattern to observe.
>>
>>  Basically, it has four components namely,
>> 1) kafka
>> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with each
>> message of about 1KB.
>> 3) Spark  job subscribed to `test` topic and writes out to another topic
>> `output`.
>> 4) A Kafka consumer, reading from the `output` topic.
>>
>> How the latency was measured ?
>>
>> While sending messages from kafka producer, each message is embedded the
>> timestamp at which it is pushed to the kafka `test` topic. Spark receives
>> each message and writes them out to `output` topic as is. When these
>> messages arrive at Kafka consumer, their embedded time is subtracted from
>> the time of arrival at the consumer and a scatter plot of the same is
>> attached.
>>
>> The scatter plots sample only 10 minutes of data received during initial
>> one hour and then again 10 minutes of data received after 2 hours of run.
>>
>>
>>
>> These plots indicate a significant slowdown in latency, in the later
>> scatter plot indicate almost all the messages were received with a delay
>> larger than 2 seconds. However, first plot show that most messages arrived
>> in less than 100ms latency. The two samples were taken with time difference
>> of 2 hours approx.
>>
>> After running the test for 24 hours, the jstat
>> 
>> and jmap
>> 
>>  output
>> for the jobs indicate possibility  of memory constrains. To be more clear,
>> job was run with local[20] and memory of 5GB(spark.driver.memory). The job
>> is straight forward and located here: https://github.com/ScrapCodes/
>> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
>> es/kafka/SparkSQLKafkaConsumer.scala .
>>
>>
>> What is causing the gradual slowdown? I need help in diagnosing the
>> problem.
>>
>> Thanks,
>>
>> --Prashant
>>
>>
>


Re: Kafka Spark structured streaming latency benchmark.

2016-12-17 Thread Prashant Sharma
Furthermore, I ran the same thing with 26 GB as the memory, which would
mean 1.3GB per thread of memory. My jmap

results and jstat

results collected after running the job for more than 11h, again show a
memory constraint. The same gradual slowdown, but a bit more gradual as
memory is considerably more than the previous runs.




This situation sounds like a memory leak ? As the byte array objects are
more than 13GB, and are not garbage collected.

--Prashant


On Sun, Dec 18, 2016 at 8:49 AM, Prashant Sharma 
wrote:

> Hi,
>
> Goal of my benchmark is to arrive at end to end latency lower than 100ms
> and sustain them over time, by consuming from a kafka topic and writing
> back to another kafka topic using Spark. Since the job does not do
> aggregation and does a constant time processing on each message, it
> appeared to me as an achievable target. But, then there are some surprising
> and interesting pattern to observe.
>
>  Basically, it has four components namely,
> 1) kafka
> 2) Long running kafka producer, rate limited to 1000 msgs/sec, with each
> message of about 1KB.
> 3) Spark  job subscribed to `test` topic and writes out to another topic
> `output`.
> 4) A Kafka consumer, reading from the `output` topic.
>
> How the latency was measured ?
>
> While sending messages from kafka producer, each message is embedded the
> timestamp at which it is pushed to the kafka `test` topic. Spark receives
> each message and writes them out to `output` topic as is. When these
> messages arrive at Kafka consumer, their embedded time is subtracted from
> the time of arrival at the consumer and a scatter plot of the same is
> attached.
>
> The scatter plots sample only 10 minutes of data received during initial
> one hour and then again 10 minutes of data received after 2 hours of run.
>
>
>
> These plots indicate a significant slowdown in latency, in the later
> scatter plot indicate almost all the messages were received with a delay
> larger than 2 seconds. However, first plot show that most messages arrived
> in less than 100ms latency. The two samples were taken with time difference
> of 2 hours approx.
>
> After running the test for 24 hours, the jstat
> 
> and jmap
> 
>  output
> for the jobs indicate possibility  of memory constrains. To be more clear,
> job was run with local[20] and memory of 5GB(spark.driver.memory). The job
> is straight forward and located here: https://github.com/ScrapCodes/
> KafkaProducer/blob/master/src/main/scala/com/github/scrapcod
> es/kafka/SparkSQLKafkaConsumer.scala .
>
>
> What is causing the gradual slowdown? I need help in diagnosing the
> problem.
>
> Thanks,
>
> --Prashant
>
>