Re: Spark + Kafka processing trouble

2016-05-31 Thread Malcolm Lockyer
Thanks for the suggestions. I agree that there isn't some magic
configuration setting, or that the sql options have some flaw - I just
intended to explain the frustration of having a non-trivial (but still
simple) Spark streaming job running on tiny amounts of data performing
absolutely horribly.

.count() is something I was adding to try and force calculation and
agree it might not be the best of tests.

On Wed, Jun 1, 2016 at 2:34 AM, Cody Koeninger <c...@koeninger.org> wrote:
> There isn't a magic spark configuration setting that would account for
> multiple-second-long fixed overheads, you should be looking at maybe
> 200ms minimum for a streaming batch.  1024 kafka topicpartitions is
> not reasonable for the volume you're talking about.  Unless you have
> really extreme workloads, 32 or 64 is a better starting guess.
>
> Rather than jumping to conclusions about sql operations being the
> problem, start from the very beginning.  Read a stream of messages
> from kafka and just do .foreach(println), at a reasonable batch size
> (say 500ms or a second), and see how that keeps up in your
> environment.  Don't use take(), don't use count(), don't use print(),
> since they may have non-obvious performance implications.
>
> If that works, add on further operations one step at a time and see
> when issues arise.
>
> On Mon, May 30, 2016 at 8:45 PM, Malcolm Lockyer
> <malcolm.lock...@hapara.com> wrote:
>> Hopefully this is not off topic for this list, but I am hoping to
>> reach some people who have used Kafka + Spark before.
>>
>> We are new to Spark and are setting up our first production
>> environment and hitting a speed issue that maybe configuration related
>> - and we have little experience in configuring Spark environments.
>>
>> So we've got a Spark streaming job that seems to take an inordinate
>> amount of time to process. I realize that without specifics, it is
>> difficult to trace - however the most basic primitives in Spark are
>> performing horribly. The lazy nature of Spark is making it difficult
>> for me to understand what is happening - any suggestions are very much
>> appreciated.
>>
>> Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
>> Kafka and PostgreSQL, both local. The job is designed to:
>>
>> a) grab some data from Kafka
>> b) correlate with existing data in PostgreSQL
>> c) output data to Kafka
>>
>> I am isolating timings by calling System.nanoTime() before and after
>> something that forces calculation, for example .count() on a
>> DataFrame. It seems like every operation has a MASSIVE fixed overhead
>> and that is stacking up making each iteration on the RDD extremely
>> slow. Slow operations include pulling a single item from the Kafka
>> queue, running a simple query against PostgresSQL, and running a Spark
>> aggregation on a RDD with a handful of rows.
>>
>> The machine is not maxing out on memory, disk or CPU. The machine
>> seems to be doing nothing for a high percentage of the execution time.
>> We have reproduced this behavior on two other machines. So we're
>> suspecting a configuration issue
>>
>> As a concrete example, we have a DataFrame produced by running a JDBC
>> query by mapping over an RDD from Kafka. Calling count() (I guess
>> forcing execution) on this DataFrame when there is *1* item/row (Note:
>> SQL database is EMPTY at this point so this is not a factor) takes 4.5
>> seconds, calling count when there are 10,000 items takes 7 seconds.
>>
>> Can anybody offer experience of something like this happening for
>> them? Any suggestions on how to understand what is going wrong?
>>
>> I have tried tuning the number of Kafka partitions - increasing this
>> seems to increase the concurrency and ultimately number of things
>> processed per minute, but to get something half decent, I'm going to
>> need running with 1024 or more partitions. Is 1024 partitions a
>> reasonable number? What do you use in you environments?
>>
>> I've tried different options for batchDuration. The calculation seems
>> to be batchDuration * Kafka partitions for number of items per
>> iteration, but this is always still extremely slow (many per iteration
>> vs. very few doesn't seem to really improve things). Can you suggest a
>> list of the Spark configuration parameters related to speed that you
>> think are key - preferably with the values you use for those
>> parameters?
>>
>> I'd really really appreciate any help or suggestions as I've been
>> working on this speed issue for 3 days without success and my head is
>&

Re: Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
On Tue, May 31, 2016 at 3:14 PM, Darren Govoni  wrote:
> Well that could be the problem. A SQL database is essential a big
> synchronizer. If you have a lot of spark tasks all bottlenecking on a single
> database socket (is the database clustered or colocated with spark workers?)
> then you will have blocked threads on the database server.

Totally agree this could be a big killer to scaling up, we are
planning to migrate. But in the meantime we are seeing such big issues
with test data of only a few records (1, 2, 1024 etc.) produced to
Kafka. Currently the database is NOT busy (CPU, memory and IO usage
from the DB is tiny).

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
On Tue, May 31, 2016 at 1:56 PM, Darren Govoni  wrote:
> So you are calling a SQL query (to a single database) within a spark
> operation distributed across your workers?

Yes, but currently with very small sets of data (1-10,000) and on a
single (dev) machine right now.





(sorry didn't reply to the list)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark + Kafka processing trouble

2016-05-30 Thread Malcolm Lockyer
Hopefully this is not off topic for this list, but I am hoping to
reach some people who have used Kafka + Spark before.

We are new to Spark and are setting up our first production
environment and hitting a speed issue that maybe configuration related
- and we have little experience in configuring Spark environments.

So we've got a Spark streaming job that seems to take an inordinate
amount of time to process. I realize that without specifics, it is
difficult to trace - however the most basic primitives in Spark are
performing horribly. The lazy nature of Spark is making it difficult
for me to understand what is happening - any suggestions are very much
appreciated.

Environment is MBP 2.2 i7. Spark master is "local[*]". We are using
Kafka and PostgreSQL, both local. The job is designed to:

a) grab some data from Kafka
b) correlate with existing data in PostgreSQL
c) output data to Kafka

I am isolating timings by calling System.nanoTime() before and after
something that forces calculation, for example .count() on a
DataFrame. It seems like every operation has a MASSIVE fixed overhead
and that is stacking up making each iteration on the RDD extremely
slow. Slow operations include pulling a single item from the Kafka
queue, running a simple query against PostgresSQL, and running a Spark
aggregation on a RDD with a handful of rows.

The machine is not maxing out on memory, disk or CPU. The machine
seems to be doing nothing for a high percentage of the execution time.
We have reproduced this behavior on two other machines. So we're
suspecting a configuration issue

As a concrete example, we have a DataFrame produced by running a JDBC
query by mapping over an RDD from Kafka. Calling count() (I guess
forcing execution) on this DataFrame when there is *1* item/row (Note:
SQL database is EMPTY at this point so this is not a factor) takes 4.5
seconds, calling count when there are 10,000 items takes 7 seconds.

Can anybody offer experience of something like this happening for
them? Any suggestions on how to understand what is going wrong?

I have tried tuning the number of Kafka partitions - increasing this
seems to increase the concurrency and ultimately number of things
processed per minute, but to get something half decent, I'm going to
need running with 1024 or more partitions. Is 1024 partitions a
reasonable number? What do you use in you environments?

I've tried different options for batchDuration. The calculation seems
to be batchDuration * Kafka partitions for number of items per
iteration, but this is always still extremely slow (many per iteration
vs. very few doesn't seem to really improve things). Can you suggest a
list of the Spark configuration parameters related to speed that you
think are key - preferably with the values you use for those
parameters?

I'd really really appreciate any help or suggestions as I've been
working on this speed issue for 3 days without success and my head is
starting to hurt. Thanks in advance.



Thanks,

--

Malcolm Lockyer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org