Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
If you haven't looked at the offset ranges in the logs for the time period
in question, I'd start there.

On Jan 24, 2017 2:51 PM, "Hakan İlter" <hakanil...@gmail.com> wrote:

Sorry for misunderstanding. When I said that, I meant there are no lag in
consumer. Kafka Manager shows each consumer's coverage and lag status.

On Tue, Jan 24, 2017 at 10:45 PM, Cody Koeninger <c...@koeninger.org> wrote:

> When you said " I check the offset ranges from Kafka Manager and don't
> see any significant deltas.", what were you comparing it against?  The
> offset ranges printed in spark logs?
>
> On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter <hakanil...@gmail.com> wrote:
> > First of all, I can both see the "Input Rate" from Spark job's statistics
> > page and Kafka producer message/sec from Kafka manager. The numbers are
> > different when I have the problem. Normally these numbers are very near.
> >
> > Besides, the job is an ETL job, it writes the results to Elastic Search.
> An
> > another legacy app also writes the same results to a database. There are
> > huge difference between DB and ES. I know how many records we process
> daily.
> >
> > Everything works fine if I run a job instance for each topic.
> >
> > On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> I'm confused, if you don't see any difference between the offsets the
> >> job is processing and the offsets available in kafka, then how do you
> >> know it's processing less than all of the data?
> >>
> >> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com>
> >> wrote:
> >> > I'm using DirectStream as one stream for all topics. I check the
> offset
> >> > ranges from Kafka Manager and don't see any significant deltas.
> >> >
> >> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> Are you using receiver-based or direct stream?
> >> >>
> >> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >> >>
> >> >> If you're using the direct stream, the actual topics and offset
> ranges
> >> >> should be visible in the logs, so you should be able to see more
> >> >> detail about what's happening (e.g. all topics are still being
> >> >> processed but offsets are significantly behind, vs only certain
> topics
> >> >> being processed but keeping up with latest offsets)
> >> >>
> >> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com>
> >> >> wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
> >> >> > from
> >> >> > multiple kafka topics. After starting the job, everything works
> fine
> >> >> > first
> >> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> >> > starts
> >> >> > processing only some part of the data (like 350 req/sec). When I
> >> >> > check
> >> >> > the
> >> >> > kafka topics, I can see that there are still 700 req/sec coming to
> >> >> > the
> >> >> > topics. I don't see any errors, exceptions or any other problem.
> The
> >> >> > job
> >> >> > works fine when I start the same code with just single kafka topic.
> >> >> >
> >> >> > Do you have any idea or a clue to understand the problem?
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > View this message in context:
> >> >> >
> >> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-st
> reaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> >> > Sent from the Apache Spark User List mailing list archive at
> >> >> > Nabble.com.
> >> >> >
> >> >> > 
> -
> >> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >
> >> >
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
When you said " I check the offset ranges from Kafka Manager and don't
see any significant deltas.", what were you comparing it against?  The
offset ranges printed in spark logs?

On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter <hakanil...@gmail.com> wrote:
> First of all, I can both see the "Input Rate" from Spark job's statistics
> page and Kafka producer message/sec from Kafka manager. The numbers are
> different when I have the problem. Normally these numbers are very near.
>
> Besides, the job is an ETL job, it writes the results to Elastic Search. An
> another legacy app also writes the same results to a database. There are
> huge difference between DB and ES. I know how many records we process daily.
>
> Everything works fine if I run a job instance for each topic.
>
> On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I'm confused, if you don't see any difference between the offsets the
>> job is processing and the offsets available in kafka, then how do you
>> know it's processing less than all of the data?
>>
>> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com>
>> wrote:
>> > I'm using DirectStream as one stream for all topics. I check the offset
>> > ranges from Kafka Manager and don't see any significant deltas.
>> >
>> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Are you using receiver-based or direct stream?
>> >>
>> >> Are you doing 1 stream per topic, or 1 stream for all topics?
>> >>
>> >> If you're using the direct stream, the actual topics and offset ranges
>> >> should be visible in the logs, so you should be able to see more
>> >> detail about what's happening (e.g. all topics are still being
>> >> processed but offsets are significantly behind, vs only certain topics
>> >> being processed but keeping up with latest offsets)
>> >>
>> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com>
>> >> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
>> >> > from
>> >> > multiple kafka topics. After starting the job, everything works fine
>> >> > first
>> >> > (like 700 req/sec) but after a while (couples of days or a week) it
>> >> > starts
>> >> > processing only some part of the data (like 350 req/sec). When I
>> >> > check
>> >> > the
>> >> > kafka topics, I can see that there are still 700 req/sec coming to
>> >> > the
>> >> > topics. I don't see any errors, exceptions or any other problem. The
>> >> > job
>> >> > works fine when I start the same code with just single kafka topic.
>> >> >
>> >> > Do you have any idea or a clue to understand the problem?
>> >> >
>> >> > Thanks.
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> >
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> >> > Nabble.com.
>> >> >
>> >> > -
>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: welcoming Burak and Holden as committers

2017-01-24 Thread Cody Koeninger
Congrats, glad to hear it

On Jan 24, 2017 12:47 PM, "Shixiong(Ryan) Zhu" 
wrote:

> Congrats Burak & Holden!
>
> On Tue, Jan 24, 2017 at 10:39 AM, Joseph Bradley 
> wrote:
>
>> Congratulations Burak & Holden!
>>
>> On Tue, Jan 24, 2017 at 10:33 AM, Dongjoon Hyun 
>> wrote:
>>
>>> Great! Congratulations, Burak and Holden.
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>> On 2017-01-24 10:29 (-0800), Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>> >  
>>> >
>>> > Congratulations, Burak and Holden.
>>> >
>>> > On Tue, Jan 24, 2017 at 1:27 PM Russell Spitzer <
>>> russell.spit...@gmail.com>
>>> > wrote:
>>> >
>>> > > Great news! Congratulations!
>>> > >
>>> > > On Tue, Jan 24, 2017 at 10:25 AM Dean Wampler >> >
>>> > > wrote:
>>> > >
>>> > > Congratulations to both of you!
>>> > >
>>> > > dean
>>> > >
>>> > > *Dean Wampler, Ph.D.*
>>> > > Author: Programming Scala, 2nd Edition
>>> > > , Fast Data
>>> > > Architectures for Streaming Applications
>>> > > >> r-streaming-applications.csp>,
>>> > > Functional Programming for Java Developers
>>> > > , and Programming
>>> Hive
>>> > >  (O'Reilly)
>>> > > Lightbend 
>>> > > @deanwampler 
>>> > > http://polyglotprogramming.com
>>> > > https://github.com/deanwampler
>>> > >
>>> > > On Tue, Jan 24, 2017 at 6:14 PM, Xiao Li 
>>> wrote:
>>> > >
>>> > > Congratulations! Burak and Holden!
>>> > >
>>> > > 2017-01-24 10:13 GMT-08:00 Reynold Xin :
>>> > >
>>> > > Hi all,
>>> > >
>>> > > Burak and Holden have recently been elected as Apache Spark
>>> committers.
>>> > >
>>> > > Burak has been very active in a large number of areas in Spark,
>>> including
>>> > > linear algebra, stats/maths functions in DataFrames, Python/R APIs
>>> for
>>> > > DataFrames, dstream, and most recently Structured Streaming.
>>> > >
>>> > > Holden has been a long time Spark contributor and evangelist. She has
>>> > > written a few books on Spark, as well as frequent contributions to
>>> the
>>> > > Python API to improve its usability and performance.
>>> > >
>>> > > Please join me in welcoming the two!
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>>
>> Joseph Bradley
>>
>> Software Engineer - Machine Learning
>>
>> Databricks, Inc.
>>
>> [image: http://databricks.com] 
>>
>
>


Re: Feedback on MLlib roadmap process proposal

2017-01-24 Thread Cody Koeninger
Totally agree with most of what Sean said, just wanted to give an
alternate take on the "maintainers" thing

On Tue, Jan 24, 2017 at 10:23 AM, Sean Owen  wrote:
> There is no such list because there's no formal notion of ownership or
> access to subsets of the project. Tracking an informal notion would be
> process mostly for its own sake, and probably just go out of date. We sort
> of tried this with 'maintainers' and it didn't actually do anything.
>

My perception of that situation is that the Apache process is actively
antagonistic towards factoring out responsibility for particular parts
of the code into a hierarchy.  I think if Spark was under a different
open source model, with otherwise exactly the same committers, that
attempt at identifying maintainers would have worked out differently.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
I'm confused, if you don't see any difference between the offsets the
job is processing and the offsets available in kafka, then how do you
know it's processing less than all of the data?

On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com> wrote:
> I'm using DirectStream as one stream for all topics. I check the offset
> ranges from Kafka Manager and don't see any significant deltas.
>
> On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Are you using receiver-based or direct stream?
>>
>> Are you doing 1 stream per topic, or 1 stream for all topics?
>>
>> If you're using the direct stream, the actual topics and offset ranges
>> should be visible in the logs, so you should be able to see more
>> detail about what's happening (e.g. all topics are still being
>> processed but offsets are significantly behind, vs only certain topics
>> being processed but keeping up with latest offsets)
>>
>> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com> wrote:
>> > Hi everyone,
>> >
>> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
>> > multiple kafka topics. After starting the job, everything works fine
>> > first
>> > (like 700 req/sec) but after a while (couples of days or a week) it
>> > starts
>> > processing only some part of the data (like 350 req/sec). When I check
>> > the
>> > kafka topics, I can see that there are still 700 req/sec coming to the
>> > topics. I don't see any errors, exceptions or any other problem. The job
>> > works fine when I start the same code with just single kafka topic.
>> >
>> > Do you have any idea or a clue to understand the problem?
>> >
>> > Thanks.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Failure handling

2017-01-24 Thread Cody Koeninger
Can you identify the error case and call System.exit ?  It'll get
retried on another executor, but as long as that one fails the same
way...

If you can identify the error case at the time you're doing database
interaction and just prevent data being written then, that's what I
typically do.

On Tue, Jan 24, 2017 at 7:50 AM, Erwan ALLAIN  wrote:
> Hello guys,
>
> I have a question regarding how spark handle failure.
>
> I’m using kafka direct stream
> Spark 2.0.2
> Kafka 0.10.0.1
>
> Here is a snippet of code
>
> val stream = createDirectStream(….)
>
> stream
>  .map(…)
> .forEachRDD( doSomething)
>
> stream
> .map(…)
> .forEachRDD( doSomethingElse)
>
> The execution is in FIFO, so the first action ends after the second starts
> so far so good.
> However, I would like that when an error (fatal or not) occurs during the
> execution of the first action, the streaming context is stopped immediately.
> It's like the driver is not notified of the exception and launch the second
> action.
>
> In our case, the second action is performing checkpointing in an external
> database and we do not want to checkpoint if an error occurs before.
> We do not want to rely on spark checkpoint as it causes issue when upgrading
> application.
>
> Let me know if it’s not clear !
>
> Thanks !
>
> Erwan

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Cody Koeninger
Are you using receiver-based or direct stream?

Are you doing 1 stream per topic, or 1 stream for all topics?

If you're using the direct stream, the actual topics and offset ranges
should be visible in the logs, so you should be able to see more
detail about what's happening (e.g. all topics are still being
processed but offsets are significantly behind, vs only certain topics
being processed but keeping up with latest offsets)

On Mon, Jan 23, 2017 at 3:14 PM, hakanilter  wrote:
> Hi everyone,
>
> I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> multiple kafka topics. After starting the job, everything works fine first
> (like 700 req/sec) but after a while (couples of days or a week) it starts
> processing only some part of the data (like 350 req/sec). When I check the
> kafka topics, I can see that there are still 700 req/sec coming to the
> topics. I don't see any errors, exceptions or any other problem. The job
> works fine when I start the same code with just single kafka topic.
>
> Do you have any idea or a clue to understand the problem?
>
> Thanks.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Assembly for Kafka >= 0.10.0, Spark 2.2.0, Scala 2.11

2017-01-18 Thread Cody Koeninger
Spark 2.2 hasn't been released yet, has it?

Python support in kafka dstreams for 0.10 is probably never, there's a
jira ticket about this.

Stable, hard to say.  It was quite a few releases before 0.8 was
marked stable, even though it underwent little change.

On Wed, Jan 18, 2017 at 2:21 AM, Karamba  wrote:
> |Hi, I am looking for an assembly for Spark 2.2.0 with Scala 2.11. I
> can't find one in MVN Repository. Moreover, "org.apache.spark" %%
> "spark-streaming-kafka-0-10_2.11" % "2.1.0 shows that even sbt does not
> find one: [error] (*:update) sbt.ResolveException: unresolved
> dependency: org.apache.spark#spark-streaming-kafka-0-10_2.11_2.11;2.1.0:
> not found Where do I find that a library? Thanks and best regards,
> karamba PS: Does anybody know when python support becomes available in
> spark-streaming-kafka-0-10 and when it will reach "stable"? |
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-19185) ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing

2017-01-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826527#comment-15826527
 ] 

Cody Koeninger commented on SPARK-19185:


I'd expect setting cache capacity to zero to cause failures, but it's probably 
(slightly) faster to try than just patching the lines of code I pointed out.

In general, a single application using kafka consumers should not be reading 
from different places in the same topicpartition in different threads, because 
it breaks ordering guarantees.  That's an implication of Kafka semantics, not 
Spark semantics.  That's why the consumer cache exists the way it does.

Changing that behavior on a widespread basis is going to break ordering 
guarantees, which will break some people's existing jobs.  Hence my comments 
about arguing design decisions with committers.

> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -
>
> Key: SPARK-19185
> URL: https://issues.apache.org/jira/browse/SPARK-19185
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>Reporter: Kalvin Chau
>  Labels: streaming, windowing
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is 
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been 
> working through debugging this issue and after looking through some of the 
> spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using 
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker 
> threads assigned the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread 
> went first would seek and poll for all the records, and at the same time the 
> second thread would try to seek to its offset but fail because it is unable 
> to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: 
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting 
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block 
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in 
> task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection

[jira] [Commented] (SPARK-19185) ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing

2017-01-15 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823164#comment-15823164
 ] 

Cody Koeninger commented on SPARK-19185:


This is a good error report, sorry it's taken me a while to get back to you on 
this.

My immediate suggestions to you as a workaround would be
- Try persist before windowing, so that batches of offsets from Kafka are only 
fetched once, rather than repeatedly and possibly simultaneously for a given 
kafka partition.  I'm assuming that's the underlying issue, but could be wrong.
- Failing that, KafkaRDD's constructor takes a boolean parameter indicating 
whether to use the consumer cache.  You can straightforwardly modify 
DirectKafkaInputDStream.compute to pass false.  This will require rebuilding 
only the kafka consumer jar, not redeploying all of spark.  This will be a 
performance hit, especially if you're using SSL, but is better than nothing.

Fixing this in the Spark master branch (either by allowing configuration of 
whether to use the consumer cache, or replacing the consumer cache with a pool 
of consumers with different group ids for the same topicpartition) is going to 
require getting the attention of a committer.  I don't really have the time to 
mess with that right now (happy to do the work, but zero interest in tracking 
down committers and arguing design decisions).

That being said, if one of the workarounds suggested above doesn't help you, 
let me know.


> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -
>
> Key: SPARK-19185
> URL: https://issues.apache.org/jira/browse/SPARK-19185
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>Reporter: Kalvin Chau
>  Labels: streaming, windowing
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is 
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been 
> working through debugging this issue and after looking through some of the 
> spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using 
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker 
> threads assigned the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread 
> went first would seek and poll for all the records, and at the same time the 
> second thread would try to seek to its offset but fail because it is unable 
> to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: 
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting 
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block 
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in 
> task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acq

Re: Kafka 0.8 + Spark 2.0 Partition Issue

2017-01-06 Thread Cody Koeninger
Kafka is designed to only allow reads from leaders.  You need to fix
this at the kafka level not the spark level.

On Fri, Jan 6, 2017 at 7:33 AM, Raghu Vadapalli  wrote:
>
> My spark 2.0 +  kafka 0.8 streaming job fails with error partition leaderset
> exception. When I check the kafka topic the partition, it is indeed in error
> with Leader = -1 and empty ISR.  I did lot of google and all of them point
> to either restarting or deleting the topic.  To do any of those two in
> production system while other topics are in heavy use is next to impossible.
> Now my question, is there way to force spark to read from leaderless
> partition accepting some dataloss or inconsistency ? Or force the immediate
> sync followed by election ( a kafka users group question but I am pushing my
> luck:) here )
>
> Topic: vzecsapplog Partition: 8 Leader: -1 Replicas: 5,4 Isr:
>
> --
> Regards,
> Raghu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Improvement Proposals

2017-01-03 Thread Cody Koeninger
I don't have a concern about voting vs consensus.

I have a concern that whatever the decision making process is, it is
explicitly announced on the ticket for the given proposal, with an explicit
deadline, and an explicit outcome.


On Tue, Jan 3, 2017 at 4:08 PM, Imran Rashid <iras...@cloudera.com> wrote:

> I'm also in favor of this.  Thanks for your persistence Cody.
>
> My take on the specific issues Joseph mentioned:
>
> 1) voting vs. consensus -- I agree with the argument Ryan Blue made
> earlier for consensus:
>
> > Majority vs consensus: My rationale is that I don't think we want to
> consider a proposal approved if it had objections serious enough that
> committers down-voted (or PMC depending on who gets a vote). If these
> proposals are like PEPs, then they represent a significant amount of
> community effort and I wouldn't want to move forward if up to half of the
> community thinks it's an untenable idea.
>
> 2) Design doc template -- agree this would be useful, but also seems
> totally orthogonal to moving forward on the SIP proposal.
>
> 3) agree w/ Joseph's proposal for updating the template.
>
> One small addition:
>
> 4) Deciding on a name -- minor, but I think its wroth disambiguating from
> Scala's SIPs, and the best proposal I've heard is "SPIP".   At least, no
> one has objected.  (don't care enough that I'd object to anything else,
> though.)
>
>
> On Tue, Jan 3, 2017 at 3:30 PM, Joseph Bradley <jos...@databricks.com>
> wrote:
>
>> Hi Cody,
>>
>> Thanks for being persistent about this.  I too would like to see this
>> happen.  Reviewing the thread, it sounds like the main things remaining are:
>> * Decide about a few issues
>> * Finalize the doc(s)
>> * Vote on this proposal
>>
>> Issues & TODOs:
>>
>> (1) The main issue I see above is voting vs. consensus.  I have little
>> preference here.  It sounds like something which could be tailored based on
>> whether we see too many or too few SIPs being approved.
>>
>> (2) Design doc template  (This would be great to have for Spark
>> regardless of this SIP discussion.)
>> * Reynold, are you still putting this together?
>>
>> (3) Template cleanups.  Listing some items mentioned above + a new one
>> w.r.t. Reynold's draft
>> <https://docs.google.com/document/d/1-Zdi_W-wtuxS9hTK0P9qb2x-nRanvXmnZ7SUi4qMljg/edit#>
>> :
>> * Reinstate the "Where" section with links to current and past SIPs
>> * Add field for stating explicit deadlines for approval
>> * Add field for stating Author & Committer shepherd
>>
>> Thanks all!
>> Joseph
>>
>> On Mon, Jan 2, 2017 at 7:45 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> I'm bumping this one more time for the new year, and then I'm giving up.
>>>
>>> Please, fix your process, even if it isn't exactly the way I suggested.
>>>
>>> On Tue, Nov 8, 2016 at 11:14 AM, Ryan Blue <rb...@netflix.com> wrote:
>>> > On lazy consensus as opposed to voting:
>>> >
>>> > First, why lazy consensus? The proposal was for consensus, which is at
>>> least
>>> > three +1 votes and no vetos. Consensus has no losing side, it requires
>>> > getting to a point where there is agreement. Isn't that agreement what
>>> we
>>> > want to achieve with these proposals?
>>> >
>>> > Second, lazy consensus only removes the requirement for three +1
>>> votes. Why
>>> > would we not want at least three committers to think something is a
>>> good
>>> > idea before adopting the proposal?
>>> >
>>> > rb
>>> >
>>> > On Tue, Nov 8, 2016 at 8:13 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >>
>>> >> So there are some minor things (the Where section heading appears to
>>> >> be dropped; wherever this document is posted it needs to actually link
>>> >> to a jira filter showing current / past SIPs) but it doesn't look like
>>> >> I can comment on the google doc.
>>> >>
>>> >> The major substantive issue that I have is that this version is
>>> >> significantly less clear as to the outcome of an SIP.
>>> >>
>>> >> The apache example of lazy consensus at
>>> >> http://apache.org/foundation/voting.html#LazyConsensus involves an
>>> >> explicit announcement of an explicit deadline, which I think are
>>> >> necessary for clarity.
>&g

Re: [Spark Kafka] How to update batch size of input dynamically for spark kafka consumer?

2017-01-03 Thread Cody Koeninger
You can't change the batch time, but you can limit the number of items
in the batch

http://spark.apache.org/docs/latest/configuration.html

spark.streaming.backpressure.enabled

spark.streaming.kafka.maxRatePerPartition

On Tue, Jan 3, 2017 at 4:00 AM, 周家帅  wrote:
> Hi,
>
> I am an intermediate spark user and have some experience in large data
> processing. I post this question in StackOverflow but receive no response.
> My problem is as follows:
>
> I use createDirectStream in my spark streaming application. I set the batch
> interval to 7 seconds and most of the time the batch job can finish within
> about 5 seconds. However, in very rare cases, the batch job need cost 60
> seconds and this will delay some batches of jobs. To cut down the total
> delay time for these batches, I hope I can process more streaming data which
> spread over the delayed jobs at one time. This will help the streaming
> return to normal as soon as possible.
>
> So, I want to know there is some method to dynamically update/merge batch
> size of input for spark and kafka when delay appears.
>
> Many thanks for your help.
>
>
> --
> Jiashuai Zhou
>
> School of Electronics Engineering and Computer Science,
> Peking University
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can't access the data in Kafka Spark Streaming globally

2016-12-23 Thread Cody Koeninger
This doesn't sound like a question regarding Kafka streaming, it
sounds like confusion about the scope of variables in spark generally.
Is that right?  If so, I'd suggest reading the documentation, starting
with a simple rdd (e.g. using sparkContext.parallelize), and
experimenting to confirm your understanding.

On Thu, Dec 22, 2016 at 11:46 PM, Sree Eedupuganti  wrote:
> I am trying to stream the data from Kafka to Spark.
>
> JavaPairInputDStream directKafkaStream =
> KafkaUtils.createDirectStream(ssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams, topics);
>
> Here i am iterating over the JavaPairInputDStream to process the RDD's.
>
> directKafkaStream.foreachRDD(rdd ->{
> rdd.foreachPartition(items ->{
> while (items.hasNext()) {
> String[] State = items.next()._2.split("\\,");
>
> System.out.println(State[2]+","+State[3]+","+State[4]+"--");
> };
> });
> });
>
>
> In this i can able to access the String Array but when i am trying to access
> the String Array data globally i can't access the data. Here my requirement
> is if i had access these data globally i had another lookup table in Hive.
> So i am trying to perform an operation on these. Any suggestions please,
> Thanks.
>
>
> --
> Best Regards,
> Sreeharsha Eedupuganti

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why foreachPartition function make duplicate invocation to map function for every message ? (Spark 2.0.2)

2016-12-16 Thread Cody Koeninger
Please post a minimal complete code example of what you are talking about

On Thu, Dec 15, 2016 at 6:00 PM, Michael Nguyen
 wrote:
> I have the following sequence of Spark Java API calls (Spark 2.0.2):
>
> Kafka stream that is processed via a map function, which returns the string
> value from tuple2._2() for JavaDStream as in
>
> return tuple2._2();
>
> The returned JavaDStream is then processed by foreachPartition, which is
> wrapped by foreachRDD.
>
> foreachPartition's call function does Iterator on the RDD as in
> inputRDD.next ();
>
> When data is received, step 1 is executed, which is correct. However,
> inputRDD.next () in step 3 makes a duplicate call to the map function in
> step 1. So that map function is called twice for every message:
>
> -  the first time when the message is received from the Kafka stream, and
>
> - the second time when Iterator inputParams.next () is invoked from
> foreachPartition's call function.
>
> I also tried transforming the data in the map function as in
>
> public TestTransformedClass call(Tuple2  tuple2) for step 1
>
> public void call(Iterator  inputParams) for step 3
>
> and the same issue occurs. So this issue occurs, no matter whether this
> sequence of Spark API calls involves data transformation or not.
>
> Questions:
>
> Since the message was already processed in step 1, why does inputRDD.next ()
> in step 3 makes a duplicate call to the map function in step 1 ?
>
> How do I fix it to avoid duplicate invocation for every message ?
>
> Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2 or Spark 1.6.x?

2016-12-12 Thread Cody Koeninger
You certainly can use stable version of Kafka brokers with spark
2.0.2, why would you think otherwise?

On Mon, Dec 12, 2016 at 8:53 AM, Amir Rahnama  wrote:
> Hi,
>
> You need to describe more.
>
> For example, in Spark 2.0.2, you can't use stable versions of Apache Kafka.
>
> In general, I would say start with 2.0.2-
>
> On Mon, Dec 12, 2016 at 7:34 AM, Lohith Samaga M 
> wrote:
>>
>> Hi,
>>
>> I am new to Spark. I would like to learn Spark.
>>
>> I think I should learn version 2.0.2.
>>
>> Or should I still go for version 1.6.x and then come to version 2.0.2?
>>
>>
>>
>> Please advise.
>>
>>
>>
>> Thanks in advance.
>>
>>
>>
>> Best regards / Mit freundlichen Grüßen / Sincères salutations
>>
>> M. Lohith Samaga
>>
>>
>> Information transmitted by this e-mail is proprietary to Mphasis, its
>> associated companies and/ or its customers and is intended
>> for use only by the individual or entity to which it is addressed, and may
>> contain information that is privileged, confidential or
>> exempt from disclosure under applicable law. If you are not the intended
>> recipient or it appears that this mail has been forwarded
>> to you without proper authority, you are notified that any use or
>> dissemination of this information in any manner is strictly
>> prohibited. In such cases, please notify us immediately at
>> mailmas...@mphasis.com and delete this mail from your records.
>
>
>
>
> --
> Thanks and Regards,
>
> Amir Hossein Rahnama
>
> Tel: +46 (0) 729 785 012
> Website: www.ambodi.com
> Twitter: @_ambodi

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka

2016-12-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Use a separate group id for each stream, like the docs say.

If you're doing multiple output operations, and aren't caching, spark
is going to read from kafka again each time, and if some of those
reads are happening for the same group and same topicpartition, it's
not going to work.

On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
 wrote:
> Hi Anton,
>
> What is the command you run your spark app with? Why not working with data
> instead of stream on your second stage operation? Can you provide logs with
> the issue?
>
> ConcurrentModificationException is not a spark issue, it means that you use
> the same Kafka consumer instance from more than one thread.
>
> Additionally,
>
> 1) As I understand new kafka consumer is created every time when you call
> KafkaUtils.createDirectStream.
> 2) If you assign the same group id to several consumer instances then all
> the consumers will get different set of messages on the same topic. This is
> a kind of load balancing which kafka provides with its Consumer API.
>
> Oleksii
>
> On 11 December 2016 at 18:46, Anton Okolnychyi 
> wrote:
>>
>> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
>> nothing custom.
>>
>>
>> I will try restate the initial question. Let's consider an example.
>>
>> 1. I create a stream and subscribe to a certain topic.
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>> 2. I extract the actual data from the stream. For instance, word counts.
>>
>> val wordCounts = stream.map(record => (record.value(), 1))
>>
>> 3. Then I compute something and output the result to console.
>>
>> val firstResult = stream.reduceByWindow(...)
>> firstResult.print()
>>
>> Once that is done, I would like to perform another computation on top of
>> wordCounts and output that result again to console. In my current
>> understanding, I cannot just reuse wordCounts from Step 2 and should create
>> a new stream with another group id and then define the second computation.
>> Am I correct that if add the next part, then I can get
>> "ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access"?
>>
>> // another computation on wordCounts
>> val secondResult = wordCounts.reduceByKeyAndWindow(...)
>> secondResult.output()
>>
>> Thanks,
>> Anton
>>
>> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
>>>
>>> Hi,
>>> Usual general questions are:
>>> -- what is your Spark version?
>>> -- what is your Kafka version?
>>> -- do you use "standard" Kafka consumer or try to implement something
>>> custom (your own multi-threaded consumer)?
>>>
>>> The freshest docs
>>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>>> !!!)

 kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>>
>>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
>>>  wrote:

 Hi,

 I am experimenting with Spark Streaming and Kafka. I will appreciate if
 someone can say whether the following assumption is correct.

 If I have multiple computations (each with its own output) on one stream
 (created as KafkaUtils.createDirectStream), then there is a chance to have
 ConcurrentModificationException: KafkaConsumer is not safe for
 multi-threaded access.  To solve this problem, I should create a new stream
 with different "group.id" for each computation.

 Am I right?

 Best regards,
 Anton
>>>
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2016-12-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742285#comment-15742285
 ] 

Cody Koeninger commented on SPARK-17147:


If compacted topics are important to you, then you should help test the branch 
listed above.

Yes, you can try increasing poll.ms

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Apache Spark 2.1.0 (RC2)

2016-12-09 Thread Cody Koeninger
Agree that frequent topic deletion is not a very Kafka-esque thing to do

On Fri, Dec 9, 2016 at 12:09 PM, Shixiong(Ryan) Zhu
 wrote:
> Sean, "stress test for failOnDataLoss=false" is because Kafka consumer may
> be thrown NPE when a topic is deleted. I added some logic to retry on such
> failure, however, it may still fail when topic deletion is too frequent (the
> stress test). Just reopened
> https://issues.apache.org/jira/browse/SPARK-18588.
>
> Anyway, this is just a best effort to deal with Kafka issue, and in
> practice, people won't delete topic frequently, so this is not a release
> blocker.
>
> On Fri, Dec 9, 2016 at 2:55 AM, Sean Owen  wrote:
>>
>> As usual, the sigs / hashes are fine and licenses look fine.
>>
>> I am still seeing some test failures. A few I've seen over time and aren't
>> repeatable, but a few seem persistent. ANyone else observed these? I'm on
>> Ubuntu 16 / Java 8 building for -Pyarn -Phadoop-2.7 -Phive
>>
>> If anyone can confirm I'll investigate the cause if I can. I'd hesitate to
>> support the release yet unless the build is definitely passing for others.
>>
>>
>> udf3Test(test.org.apache.spark.sql.JavaUDFSuite)  Time elapsed: 0.281 sec
>> <<< ERROR!
>> java.lang.NoSuchMethodError:
>> org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(Lcom/google/common/reflect/TypeToken;)Lscala/Tuple2;
>> at test.org.apache.spark.sql.JavaUDFSuite.udf3Test(JavaUDFSuite.java:107)
>>
>>
>>
>> - caching on disk *** FAILED ***
>>   java.util.concurrent.TimeoutException: Can't find 2 executors before
>> 3 milliseconds elapsed
>>   at
>> org.apache.spark.ui.jobs.JobProgressListener.waitUntilExecutorsUp(JobProgressListener.scala:584)
>>   at
>> org.apache.spark.DistributedSuite.org$apache$spark$DistributedSuite$$testCaching(DistributedSuite.scala:154)
>>   at
>> org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply$mcV$sp(DistributedSuite.scala:191)
>>   at
>> org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(DistributedSuite.scala:191)
>>   at
>> org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(DistributedSuite.scala:191)
>>   at
>> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>>   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>>   ...
>>
>>
>> - stress test for failOnDataLoss=false *** FAILED ***
>>   org.apache.spark.sql.streaming.StreamingQueryException: Query [id =
>> 3b191b78-7f30-46d3-93f8-5fbeecce94a2, runId =
>> 0cab93b6-19d8-47a7-88ad-d296bea72405] terminated with exception: null
>>   at
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:262)
>>   at
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:160)
>>   ...
>>   Cause: java.lang.NullPointerException:
>>   ...
>>
>>
>>
>> On Thu, Dec 8, 2016 at 4:40 PM Reynold Xin  wrote:
>>>
>>> Please vote on releasing the following candidate as Apache Spark version
>>> 2.1.0. The vote is open until Sun, December 11, 2016 at 1:00 PT and passes
>>> if a majority of at least 3 +1 PMC votes are cast.
>>>
>>> [ ] +1 Release this package as Apache Spark 2.1.0
>>> [ ] -1 Do not release this package because ...
>>>
>>>
>>> To learn more about Apache Spark, please see http://spark.apache.org/
>>>
>>> The tag to be voted on is v2.1.0-rc2
>>> (080717497365b83bc202ab16812ced93eb1ea7bd)
>>>
>>> List of JIRA tickets resolved are:
>>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0
>>>
>>> The release files, including signatures, digests, etc. can be found at:
>>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-bin/
>>>
>>> Release artifacts are signed with the following key:
>>> https://people.apache.org/keys/committer/pwendell.asc
>>>
>>> The staging repository for this release can be found at:
>>> https://repository.apache.org/content/repositories/orgapachespark-1217
>>>
>>> The documentation corresponding to this release can be found at:
>>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-docs/
>>>
>>>
>>> (Note that the docs and staging repo are still being uploaded and will be
>>> available soon)
>>>
>>>
>>> ===
>>> How can I help test this release?
>>> ===
>>> If you are a Spark user, you can help us test this release by taking an
>>> existing Spark workload and running on this release candidate, then
>>> reporting any regressions.
>>>
>>> ===
>>> What should happen to JIRA tickets still targeting 2.1.0?
>>> 

Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
I'd say unzip your actual assembly jar and verify whether the kafka
consumer classes are 0.10.1 or 0.10.0.  We've seen reports of odd
behavior with 0.10.1 classes.  Possibly unrelated, but good to
eliminate.

On Fri, Dec 9, 2016 at 10:38 AM, Debasish Ghosh
<ghosh.debas...@gmail.com> wrote:
> oops .. it's 0.10.0 .. sorry for the confusion ..
>
> On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>>
>> My assembly contains the 0.10.1 classes .. Here are the dependencies
>> related to kafka & spark that my assembly has ..
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.kafka"  %   "kafka-streams"  %
>> "0.10.0.0",
>>   "org.apache.spark" %%   "spark-streaming-kafka-0-10" % spark,
>>   "org.apache.spark" %%   "spark-core" % spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-streaming"% spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-mllib"% spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-sql"  % spark %
>> "provided"
>> )
>>
>> regards.
>>
>> On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> When you say 0.10.1 do you mean broker version only, or does your
>>> assembly contain classes from the 0.10.1 kafka consumer?
>>>
>>> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com>
>>> wrote:
>>> > Hello -
>>> >
>>> > I am facing some issues with the following snippet of code that reads
>>> > from
>>> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
>>> > with
>>> > Kafka 0.10.1 and Spark 2.0.1.
>>> >
>>> > // get the data from kafka
>>> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>>> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>>> > streamingContext,
>>> > PreferConsistent,
>>> > Subscribe[Array[Byte], (String, String)](topicToReadFrom,
>>> > kafkaParams)
>>> >   )
>>> >
>>> > // label and vectorize the value
>>> > val projected: DStream[(String, Vector)] = stream.map { record =>
>>> >   val (label, value) = record.value
>>> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>>> >   (label, vector)
>>> > }.transform(projectToLowerDimension)
>>> >
>>> > In the above snippet if I have the call to transform in the last line,
>>> > I get
>>> > the following exception ..
>>> >
>>> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
>>> > not
>>> > safe for multi-threaded access
>>> > at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>>> > at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>> > at
>>> >
>>> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>>> > at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>>> > at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>>> > at

Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
When you say 0.10.1 do you mean broker version only, or does your
assembly contain classes from the 0.10.1 kafka consumer?

On Fri, Dec 9, 2016 at 10:19 AM, debasishg  wrote:
> Hello -
>
> I am facing some issues with the following snippet of code that reads from
> Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
> Kafka 0.10.1 and Spark 2.0.1.
>
> // get the data from kafka
> val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
> streamingContext,
> PreferConsistent,
> Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
>   )
>
> // label and vectorize the value
> val projected: DStream[(String, Vector)] = stream.map { record =>
>   val (label, value) = record.value
>   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>   (label, vector)
> }.transform(projectToLowerDimension)
>
> In the above snippet if I have the call to transform in the last line, I get
> the following exception ..
>
> Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> 
>
> The transform method does a PCA and gives the top 2 principal components ..
>
> private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
> Vector)] = { rdd =>
>   if (rdd.isEmpty) rdd else {
> // reduce to 2 dimensions
> val pca = new PCA(2).fit(rdd.map(_._2))
>
> // Project vectors to the linear space spanned by the top 2 principal
> // components, keeping the label
> rdd.map(p => (p._1, pca.transform(p._2)))
>   }
> }
>
> However if I remove the transform call, I can process everything correctly.
>
> Any help will be most welcome ..
>
> regards.
> - Debasish
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reprocessing failed jobs in Streaming job

2016-12-07 Thread Cody Koeninger
If your operations are idempotent, you should be able to just run a
totally separate job that looks for failed batches and does a kafkaRDD
to reprocess that batch.  C* probably isn't the first choice for what
is essentially a queue, but if the frequency of batches is relatively
low it probably doesn't matter.

That is indeed a weird stacktrace, did you investigate driver logs to
see if there was something else preceding it?

On Wed, Dec 7, 2016 at 2:41 PM, map reduced <k3t.gi...@gmail.com> wrote:
>> Personally I think forcing the stream to fail (e.g. check offsets in
>> downstream store and throw exception if they aren't as expected) is
>> the safest thing to do.
>
>
> I would think so too, but just for say 2-3 (sometimes just 1) failed batches
> in a whole day, I am trying to not kill the whole processing and restart.
>
> I am storing the offsets per batch and success/failure in a separate C*
> table - checkpointing was not an option due to it not working with
> application jar change etc.  Since I have access to the offsets, you think
> #2 or some variation of it may work?
>
> Btw, some of those failures I mentioned are strange, for instance (Spark
> 2.0.0 and spark-streaming-kafka-0-8_2.11):
>
> Job aborted due to stage failure: Task 173 in stage 92312.0 failed 10 times,
> most recent failure: Lost task 173.9 in stage 92312.0 (TID 27689025,
> 17.162.114.161): java.util.NoSuchElementException
>   at
> java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:2036)
>   at
> com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:102)
>   at
> com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:81)
>   at com.yammer.metrics.core.Histogram.update(Histogram.java:110)
>   at com.yammer.metrics.core.Timer.update(Timer.java:198)
>   at com.yammer.metrics.core.Timer.update(Timer.java:76)
>   at com.yammer.metrics.core.TimerContext.stop(TimerContext.java:31)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36)
>   at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
>   at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:209)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> On Wed, Dec 7, 2016 at 12:16 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Personally I think forcing the stream to fail (e.g. check offsets in
>> downstream store and throw exception if they aren't as expected) is
>> the safest thing to do.
>>
>> If you proceed after a failure, you need a place to reliably record
>> the batches that failed for later processing.
>>
>> On Wed, Dec 7, 2016 at 1:46 PM, map reduced <k3t.gi...@gmail.com> wrote:
>> > Hi,
>> >
>> > I am trying to solve this problem - in my streaming flow, every day few
>> > jobs
>> > fail due to some (say kafka cluster maintenance etc, mostly unavoidable)
>> > reasons for few batches and resumes back to success.
>> > I want to reprocess those failed jobs programmatically (assume I have a
>> > way
>> > of getting start-end offsets for kafka topics for failed jobs). I was
>> > thinking of these options:
>> > 1) Somehow pause streaming job when it detects failing jobs - this seems
>> > not
>> > possible.
>> > 2) From driver - run additional proce

Re: Reprocessing failed jobs in Streaming job

2016-12-07 Thread Cody Koeninger
Personally I think forcing the stream to fail (e.g. check offsets in
downstream store and throw exception if they aren't as expected) is
the safest thing to do.

If you proceed after a failure, you need a place to reliably record
the batches that failed for later processing.

On Wed, Dec 7, 2016 at 1:46 PM, map reduced  wrote:
> Hi,
>
> I am trying to solve this problem - in my streaming flow, every day few jobs
> fail due to some (say kafka cluster maintenance etc, mostly unavoidable)
> reasons for few batches and resumes back to success.
> I want to reprocess those failed jobs programmatically (assume I have a way
> of getting start-end offsets for kafka topics for failed jobs). I was
> thinking of these options:
> 1) Somehow pause streaming job when it detects failing jobs - this seems not
> possible.
> 2) From driver - run additional processing to check every few minutes using
> driver rest api (/api/v1/applications...) what jobs have failed and submit
> batch jobs for those failed jobs
>
> 1 - doesn't seem to be possible, and I don't want to kill streaming context
> just for few failing batches to stop the job for some time and resume after
> few minutes.
> 2 - seems like a viable option, but a little complicated, since even the
> batch job can fail due to whatever reasons and I am back to tracking that
> separately etc.
>
> Does anyone has faced this issue or have any suggestions?
>
> Thanks,
> KP

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2016-12-07 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15729055#comment-15729055
 ] 

Cody Koeninger commented on SPARK-17147:


This ticket is about createDirectStream.  The question of whether it will be 
fixed is largely down to whether it's important enough to Sean or someone else 
to help test it thoroughly.

The stack trace you posted more than likely has nothing to do with this ticket, 
especially if you aren't using log compaction.  It's probably a network issue.  
Adjust spark.streaming.kafka.consumer.poll.ms, or do more investigation into 
what's going on with your network / Kafka.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Spark Streaming - join streaming and static data

2016-12-06 Thread Cody Koeninger
You do not need recent versions of spark, kafka, or structured
streaming in order to do this.  Normal DStreams are sufficient.

You can parallelize your static data from the database to an RDD, and
there's a join method available on RDDs.  Transforming a single given
timestamp line into multiple lines with modified timestamps can be
done using flatMap.

On Tue, Dec 6, 2016 at 11:11 AM, Burak Yavuz  wrote:
> Hi Daniela,
>
> This is trivial with Structured Streaming. If your Kafka cluster is 0.10.0
> or above, you may use Spark 2.0.2 to create a Streaming DataFrame from
> Kafka, and then also create a DataFrame using the JDBC connection, and you
> may join those. In Spark 2.1, there's support for a function called
> "from_json", which should also help you easily parse your messages incoming
> from Kafka.
>
> Best,
> Burak
>
> On Tue, Dec 6, 2016 at 2:16 AM, Daniela S  wrote:
>>
>> Hi
>>
>> I have some questions regarding Spark Streaming.
>>
>> I receive a stream of JSON messages from Kafka.
>> The messages consist of a timestamp and an ID.
>>
>> timestamp ID
>> 2016-12-06 13:001
>> 2016-12-06 13:405
>> ...
>>
>> In a database I have values for each ID:
>>
>> ID   minute  value
>> 1 0   3
>> 1 1   5
>> 1 2   7
>> 1 3   8
>> 5 0   6
>> 5 1   6
>> 5 2   8
>> 5 3   5
>> 5 4   6
>>
>> So I would like to join each incoming JSON message with the corresponding
>> values. It should look as follows:
>>
>> timestamp ID   minute  value
>> 2016-12-06 13:001 0   3
>> 2016-12-06 13:001 1   5
>> 2016-12-06 13:001 2   7
>> 2016-12-06 13:001 3   8
>> 2016-12-06 13:405 0   6
>> 2016-12-06 13:405 1   6
>> 2016-12-06 13:405 2   8
>> 2016-12-06 13:405 3   5
>> 2016-12-06 13:405 4   6
>> ...
>>
>> Then I would like to add the minute values to the timestamp. I only need
>> the computed timestamp and the values. So the result should look as follows:
>>
>> timestamp   value
>> 2016-12-06 13:00  3
>> 2016-12-06 13:01  5
>> 2016-12-06 13:02  7
>> 2016-12-06 13:03  8
>> 2016-12-06 13:40  6
>> 2016-12-06 13:41  6
>> 2016-12-06 13:42  8
>> 2016-12-06 13:43  5
>> 2016-12-06 13:44  6
>> ...
>>
>> Is this a possible use case for Spark Streaming? I thought I could join
>> the streaming data with the static data but I am not sure how to add the
>> minute values to the timestamp. Is this possible with Spark Streaming?
>>
>> Thank you in advance.
>>
>> Best regards,
>> Daniela
>>
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can spark support exactly once based kafka ? Due to these following question?

2016-12-05 Thread Cody Koeninger
Have you read / watched the materials linked from
https://github.com/koeninger/kafka-exactly-once

On Mon, Dec 5, 2016 at 4:17 AM, Jörn Franke  wrote:
> You need to do the book keeping of what has been processed yourself. This
> may mean roughly the following (of course the devil is in the details):
> Write down in zookeeper which part of the processing job has been done and
> for which dataset all the data has been created (do not keep the data itself
> in zookeeper).
> Once you start a processing job, check in zookeeper if it has been
> processed, if not remove all staging data, if yes terminate.
>
> As I said the details depend on your job and require some careful thinking,
> but exactly once can be achieved with Spark (and potentially zookeeper or
> similar, such as Redis).
> Of course at the same time think if you need delivery in order etc.
>
> On 5 Dec 2016, at 08:59, Michal Šenkýř  wrote:
>
> Hello John,
>
> 1. If a task complete the operation, it will notify driver. The driver may
> not receive the message due to the network, and think the task is still
> running. Then the child stage won't be scheduled ?
>
> Spark's fault tolerance policy is, if there is a problem in processing a
> task or an executor is lost, run the task (and any dependent tasks) again.
> Spark attempts to minimize the number of tasks it has to recompute, so
> usually only a small part of the data is recomputed.
>
> So in your case, the driver simply schedules the task on another executor
> and continues to the next stage when it receives the data.
>
> 2. how do spark guarantee the downstream-task can receive the shuffle-data
> completely. As fact, I can't find the checksum for blocks in spark. For
> example, the upstream-task may shuffle 100Mb data, but the downstream-task
> may receive 99Mb data due to network. Can spark verify the data is received
> completely based size ?
>
> Spark uses compression with checksuming for shuffle data so it should know
> when the data is corrupt and initiate a recomputation.
>
> As for your question in the subject:
> All of this means that Spark supports at-least-once processing. There is no
> way that I know of to ensure exactly-once. You can try to minimize
> more-than-once situations by updating your offsets as soon as possible but
> that does not eliminate the problem entirely.
>
> Hope this helps,
>
> Michal Senkyr

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Cody Koeninger
If you want finer-grained max rate setting, SPARK-17510 got merged a
while ago.  There's also SPARK-18580 which might help address the
issue of starting backpressure rate for the first batch.

On Mon, Dec 5, 2016 at 4:18 PM, Liren Ding  wrote:
> Hey all,
>
> Does backressure actually work on spark kafka streaming? According to the
> latest spark streaming document:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
> "In Spark 1.5, we have introduced a feature called backpressure that
> eliminate the need to set this rate limit, as Spark Streaming automatically
> figures out the rate limits and dynamically adjusts them if the processing
> conditions change. This backpressure can be enabled by setting the
> configuration parameter spark.streaming.backpressure.enabled to true."
> But I also see a few open spark jira tickets on this option:
> https://issues.apache.org/jira/browse/SPARK-7398
> https://issues.apache.org/jira/browse/SPARK-18371
>
> The case in the second ticket describes a similar issue as we have here. We
> use Kafka to send large batches (10~100M) to spark streaming, and the spark
> streaming interval is set to 1~4 minutes. With the backpressure set to true,
> the queued active batches still pile up when average batch processing time
> takes longer than default interval. After the spark driver is restarted, all
> queued batches turn to a giant batch, which block subsequent batches and
> also have a great chance to fail eventually. The only config we found that
> might help is "spark.streaming.kafka.maxRatePerPartition". It does limit the
> incoming batch size, but not a perfect solution since it depends on size of
> partition as well as the length of batch interval. For our case, hundreds of
> partitions X minutes of interval still produce a number that is too large
> for each batch. So we still want to figure out how to make the backressure
> work in spark kafka streaming, if it is supposed to work there. Thanks.
>
>
> Liren
>
>
>
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Cody Koeninger
If you want finer-grained max rate setting, SPARK-17510 got merged a
while ago.  There's also SPARK-18580 which might help address the
issue of starting backpressure rate for the first batch.

On Mon, Dec 5, 2016 at 4:18 PM, Liren Ding  wrote:
> Hey all,
>
> Does backressure actually work on spark kafka streaming? According to the
> latest spark streaming document:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
> "In Spark 1.5, we have introduced a feature called backpressure that
> eliminate the need to set this rate limit, as Spark Streaming automatically
> figures out the rate limits and dynamically adjusts them if the processing
> conditions change. This backpressure can be enabled by setting the
> configuration parameter spark.streaming.backpressure.enabled to true."
> But I also see a few open spark jira tickets on this option:
> https://issues.apache.org/jira/browse/SPARK-7398
> https://issues.apache.org/jira/browse/SPARK-18371
>
> The case in the second ticket describes a similar issue as we have here. We
> use Kafka to send large batches (10~100M) to spark streaming, and the spark
> streaming interval is set to 1~4 minutes. With the backpressure set to true,
> the queued active batches still pile up when average batch processing time
> takes longer than default interval. After the spark driver is restarted, all
> queued batches turn to a giant batch, which block subsequent batches and
> also have a great chance to fail eventually. The only config we found that
> might help is "spark.streaming.kafka.maxRatePerPartition". It does limit the
> incoming batch size, but not a perfect solution since it depends on size of
> partition as well as the length of batch interval. For our case, hundreds of
> partitions X minutes of interval still produce a number that is too large
> for each batch. So we still want to figure out how to make the backressure
> work in spark kafka streaming, if it is supposed to work there. Thanks.
>
>
> Liren
>
>
>
>
>
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-18682) Batch Source for Kafka

2016-12-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720146#comment-15720146
 ] 

Cody Koeninger commented on SPARK-18682:


Isn't this a duplicate of https://issues.apache.org/jira/browse/SPARK-18386

Regarding limit, that would need to be a per partition limit, either explicitly 
or implicitly (divide n by number of partitions)

> Batch Source for Kafka
> --
>
> Key: SPARK-18682
> URL: https://issues.apache.org/jira/browse/SPARK-18682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Structured Streaming
>Reporter: Michael Armbrust
>
> Today, you can start a stream that reads from kafka.  However, given kafka's 
> configurable retention period, it seems like sometimes you might just want to 
> read all of the data that is available now.  As such we should add a version 
> that works with {{spark.read}} as well.
> The options should be the same as the streaming kafka source, with the 
> following differences:
>  - {{startingOffsets}} should default to earliest, and should not allow 
> {{latest}} (which would always be empty).
>  - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
> the same assign json format as {{startingOffsets}} should also be accepted.
> It would be really good, if things like {{.limit\(n\)}} were enough to 
> prevent all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-18682) Batch Source for Kafka

2016-12-04 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18682:
---
Comment: was deleted

(was: Isn't this a duplicate of 
https://issues.apache.org/jira/browse/SPARK-18386

Regarding limit, that would need to be a per partition limit, either explicitly 
or implicitly (divide n by number of partitions))

> Batch Source for Kafka
> --
>
> Key: SPARK-18682
> URL: https://issues.apache.org/jira/browse/SPARK-18682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Structured Streaming
>Reporter: Michael Armbrust
>
> Today, you can start a stream that reads from kafka.  However, given kafka's 
> configurable retention period, it seems like sometimes you might just want to 
> read all of the data that is available now.  As such we should add a version 
> that works with {{spark.read}} as well.
> The options should be the same as the streaming kafka source, with the 
> following differences:
>  - {{startingOffsets}} should default to earliest, and should not allow 
> {{latest}} (which would always be empty).
>  - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
> the same assign json format as {{startingOffsets}} should also be accepted.
> It would be really good, if things like {{.limit\(n\)}} were enough to 
> prevent all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18682) Batch Source for Kafka

2016-12-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720145#comment-15720145
 ] 

Cody Koeninger commented on SPARK-18682:


Isn't this a duplicate of https://issues.apache.org/jira/browse/SPARK-18386

Regarding limit, that would need to be a per partition limit, either explicitly 
or implicitly (divide n by number of partitions)

> Batch Source for Kafka
> --
>
> Key: SPARK-18682
> URL: https://issues.apache.org/jira/browse/SPARK-18682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Structured Streaming
>Reporter: Michael Armbrust
>
> Today, you can start a stream that reads from kafka.  However, given kafka's 
> configurable retention period, it seems like sometimes you might just want to 
> read all of the data that is available now.  As such we should add a version 
> that works with {{spark.read}} as well.
> The options should be the same as the streaming kafka source, with the 
> following differences:
>  - {{startingOffsets}} should default to earliest, and should not allow 
> {{latest}} (which would always be empty).
>  - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
> the same assign json format as {{startingOffsets}} should also be accepted.
> It would be really good, if things like {{.limit\(n\)}} were enough to 
> prevent all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-12-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712457#comment-15712457
 ] 

Cody Koeninger commented on SPARK-18506:


Yes, amazon linux.  No, not spark-ec2, just a spark tarball 
(spark-2.0.2-bin-hadoop2.7.tgz) unpacked on the master and workers.  Only 
setting changed was conf/slaves

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.inte

[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706814#comment-15706814
 ] 

Cody Koeninger commented on SPARK-18475:


Glad you agree it shouldn't be enabled by default.

If you're in an organization where you are responsible for shit that other 
people broke, but have no power to actually fix it correctly...  I'm not sure 
there's anything useful I can say there.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706758#comment-15706758
 ] 

Cody Koeninger commented on SPARK-18475:


Burak hasn't empirically shown that it is of benefit for a properly 
partitioned, non-skewed kafka topic, especially if SSL is enabled (because of 
the effect on consumer caching).

Any output operation can tell the difference in ordering.

People are welcome to convince you that this is a worthwhile option, but there 
is no way it should be on by default.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-28 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704372#comment-15704372
 ] 

Cody Koeninger commented on SPARK-18506:


1 x spark master is m3 medium
2 x spark workers are m3 xlarge
Looking back at that particular testing setup, kafka and ZK were on a single m3 
large, which is admittedly unrealistic.
I'm a little too busy at the moment to try again with a more realistic setup 
though.

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {time

[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691386#comment-15691386
 ] 

Cody Koeninger commented on SPARK-18506:


I'm really confused by that - did you try a completely default install of
Kafka / spark?




> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely strippe

[jira] [Commented] (SPARK-18525) Kafka DirectInputStream cannot be aware of new partition

2016-11-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690915#comment-15690915
 ] 

Cody Koeninger commented on SPARK-18525:


Easiest thing to do is just restart your streaming job after you add partitions.

> Kafka DirectInputStream cannot be aware of new partition
> 
>
> Key: SPARK-18525
> URL: https://issues.apache.org/jira/browse/SPARK-18525
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.0.2
>Reporter: Zhiwen Sun
>
> It seems that DirectKafkaInputStream does not support read new partition when 
> spark streaming is running.
> Related spark code:
> https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101
> How to produce it:
> {code:title=KafkaDirectTest.scala|borderStyle=solid}
> object KafkaDirectTest {
>   def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("kafka direct test 5")
> conf.setIfMissing("spark.master", "local[3]")
> conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
> val ssc = new StreamingContext(conf, Seconds(1))
> val zkQuorum = Config("common").getString("kafka.zkquorum")
> val topic = "test_use"
> val groupId = "stream-test-0809"
> val kafkaParams = Map(
>   "metadata.broker.list" -> "dev-002:9092,dev-004:9092",
>   "group.id" -> groupId
> )
> val fromOffsets: Map[TopicAndPartition, Long] = Map(
>   new TopicAndPartition(topic, 0) -> 0L,
>   new TopicAndPartition(topic, 1) -> 0L,
>   new TopicAndPartition(topic, 2) -> 0L,
>   new TopicAndPartition(topic, 3) -> 0L
> )
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, Set(topic))
> lines.foreachRDD { rdd =>
>   rdd.foreach { row =>
> println(s"\n row: ${row} ")
>   }
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   offsetRanges.foreach { offset =>
> println(s"\n- offset: ${offset.topic} ${offset.partition} 
> ${offset.fromOffset} ${offset.untilOffset}")
>   }
> }
> ssc.start()
> ssc.awaitTermination()
>   }
> }
> {code}
> 1. start the job
> 2. add new partition of test_use topic
> The job cannot read new partition data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18525) Kafka DirectInputStream cannot be aware of new partition

2016-11-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686746#comment-15686746
 ] 

Cody Koeninger commented on SPARK-18525:


0.8 works only against defined partitions. Use 0.10 and subscribePattern

> Kafka DirectInputStream cannot be aware of new partition
> 
>
> Key: SPARK-18525
> URL: https://issues.apache.org/jira/browse/SPARK-18525
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.0.2
>Reporter: Zhiwen Sun
>
> It seems that DirectKafkaInputStream does not support read new partition when 
> spark streaming is running.
> Related spark code:
> https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101
> How to produce it:
> {code:title=KafkaDirectTest.scala|borderStyle=solid}
> object KafkaDirectTest {
>   def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("kafka direct test 5")
> conf.setIfMissing("spark.master", "local[3]")
> conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
> val ssc = new StreamingContext(conf, Seconds(1))
> val zkQuorum = Config("common").getString("kafka.zkquorum")
> val topic = "test_use"
> val groupId = "stream-test-0809"
> val kafkaParams = Map(
>   "metadata.broker.list" -> "dev-002:9092,dev-004:9092",
>   "group.id" -> groupId
> )
> val fromOffsets: Map[TopicAndPartition, Long] = Map(
>   new TopicAndPartition(topic, 0) -> 0L,
>   new TopicAndPartition(topic, 1) -> 0L,
>   new TopicAndPartition(topic, 2) -> 0L,
>   new TopicAndPartition(topic, 3) -> 0L
> )
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, Set(topic))
> lines.foreachRDD { rdd =>
>   rdd.foreach { row =>
> println(s"\n row: ${row} ")
>   }
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   offsetRanges.foreach { offset =>
> println(s"\n- offset: ${offset.topic} ${offset.partition} 
> ${offset.fromOffset} ${offset.untilOffset}")
>   }
> }
> ssc.start()
> ssc.awaitTermination()
>   }
> }
> {code}
> 1. start the job
> 2. add new partition of test_use topic
> The job cannot read new partition data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-21 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15685190#comment-15685190
 ] 

Cody Koeninger commented on SPARK-18506:


I'd try to isolate aws vs gce as a possible cause before filing a bug against 
the kafka consumer.

I don't get paid to work on Spark, so most any time spent on it is during 
non-work hours regardless :)

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apach

[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15682216#comment-15682216
 ] 

Cody Koeninger commented on SPARK-18506:


I tried your example code on an AWS 2-node spark standalone cluster, still not 
able to reproduce the issue.

[ec2-user@ip-10-0-2-58 spark-2.0.2-bin-hadoop2.7]$ ./bin/spark-submit --master 
spark://ip-10-0-2-58.ec2.internal:7077 --class example.SimpleKafkaLoggingDriver 
~/kafka-example-assembly-2.0.0.jar 10.0.2.96:9092 simple_logtest mygroup 
earliest

16/11/21 01:41:31 INFO JobScheduler: Added jobs for time 147969249 ms
simple_logtest 3 offsets: 0 to 62
simple_logtest 0 offsets: 0 to 61
simple_logtest 1 offsets: 0 to 62
simple_logtest 2 offsets: 0 to 61
simple_logtest 4 offsets: 0 to 62
16/11/21 01:41:31 INFO JobScheduler: Finished job streaming job 147969249 
ms.0 from job set of time 147969249 ms
16/11/21 01:41:31 INFO ReceivedBlockTracker: Deleting batches: 
16/11/21 01:41:31 INFO JobScheduler: Total delay: 1.946 s for time 
147969249 ms (execution: 0.009 s)
16/11/21 01:41:32 INFO InputInfoTracker: remove old batch metadata: 
simple_logtest 3 offsets: 62 to 62
simple_logtest 0 offsets: 61 to 61
simple_logtest 1 offsets: 62 to 62
simple_logtest 2 offsets: 61 to 61
simple_logtest 4 offsets: 62 to 62
16/11/21 01:41:35 INFO JobScheduler: Starting job streaming job 1479692495000 
ms.0 from job set of time 1479692495000 ms

What happens when you use ConsumerStrategies.Assign to start at 0 for the 
partitions in question?

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{par

[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15682171#comment-15682171
 ] 

Cody Koeninger commented on SPARK-18475:


An iterator certainly does have an ordering guarantee, and it's pretty 
straightforward to figure out whether a given operation shuffles.  Plenty of 
jobs have been written depending on that ordering guarantee, and it's 
documented for the Direct Stream.

The only reason it's a significant performance improvement is because the OP is 
mis-using kafka.  If he had reasonably even production into a reasonable number 
of partitions, there would be no performance improvement.

You guys might be able to convince Michael this is a good idea, but as I said, 
this isn't the first time this has come up, and my answer isn't likely to 
change.  I'm not "blocking" anything, I'm not a gatekeeper and have no more 
rights than you do.  I just think it's a really bad idea.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
This is running locally on my mac, but it's still a standalone spark
master with multiple separate executor jvms (i.e. using --master not
--local[2]), so it should be the same code paths.  I can't speak to
yarn one way or the other, but you said you tried it with the
standalone scheduler.

At the very least, you should run ./bin/kafka-run-class.sh
kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
to what you're seeing from spark.  The results you posted from spark
didn't show any incoming messages at all.

On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
<hster.investiga...@gmail.com> wrote:
> Hi Cody,
>
> Thank you for testing this on a Saturday morning!  I failed to mention that
> when our data engineer runs our drivers(even complex ones) locally on his
> Mac, the drivers work fine. However when we launch it into the cluster (4
> machines either for a YARN cluster or spark standalone) we get this issue.
>
> Heji
>
>
> On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I ran your example using the versions of kafka and spark you are
>> using, against a standalone cluster.  This is what I observed:
>>
>> (in kafka working directory)
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -2
>> simple_logtest:2:0
>> simple_logtest:4:0
>> simple_logtest:1:0
>> simple_logtest:3:0
>> simple_logtest:0:0
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -1
>> simple_logtest:2:31
>> simple_logtest:4:31
>> simple_logtest:1:31
>> simple_logtest:3:31
>> simple_logtest:0:31
>>
>> So in other words, there are 5 partitions, they all have messages in them
>>
>> (in spark working directory)
>>
>> bash-3.2$ ./bin/spark-submit --master
>> spark://Codys-MacBook-Pro.local:7077 --class
>> example.SimpleKafkaLoggingDriver
>>
>> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
>> localhost:9092 simple_logtest mygroup earliest
>>
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>>
>> simple_logtest 3 offsets: 0 to 31
>> simple_logtest 0 offsets: 0 to 31
>> simple_logtest 1 offsets: 0 to 31
>> simple_logtest 2 offsets: 0 to 31
>> simple_logtest 4 offsets: 0 to 31
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
>> 1479574025000 ms (execution: 0.005 s)
>> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
>> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
>> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
>> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
>> 147957403 ms.0 from job set of time 147957403 ms
>>
>> simple_logtest 3 offsets: 31 to 31
>> simple_logtest 0 offsets: 31 to 31
>> simple_logtest 1 offsets: 31 to 31
>> simple_logtest 2 offsets: 31 to 31
>> simple_logtest 4 offsets: 31 to 31
>>
>> So in other words, spark is indeed seeing offsets for each partition.
>>
>>
>> The results you posted look to me like there aren't any messages going
>> into the other partitions, which looks like a misbehaving producer.
>>
>> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>> <hster.investiga...@gmail.com> wrote:
>> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
>> > been
>> > struggling with this show stopper problem.
>> >
>> > When we run our drivers with auto.offset.reset=latest ingesting from a
>> > single kafka topic with 10 partitions, the driver reads correctly from
>> > all
>> > 10 partitions.
>> >
>> > However when we use auto.offset.reset=earliest, the driver will read
>> > only a
>> > single partition.
>> >
>> > When we turn on the debug logs, we sometimes see partitions being set to
>> > different offset configuration even though the consumer config correctly
>> > indicates auto.offset.reset=earliest.
>> >
>> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
>> >> offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Resetting offset for partition simple_test-

Re: using StreamingKMeans

2016-11-19 Thread Cody Koeninger
So I haven't played around with streaming k means at all, but given
that no one responded to your message a couple of days ago, I'll say
what I can.

1. Can you not sample out some % of the stream for training?
2. Can you run multiple streams at the same time with different values
for k and compare their performance?
3. foreachRDD is fine in general, can't speak to the specifics.
4. If you haven't done any transformations yet on a direct stream,
foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
is very cheap, it's done on the driver only because the beginning and
ending offsets are known.  So you should be able to skip empty
batches.



On Sat, Nov 19, 2016 at 10:46 AM, debasishg  wrote:
> Hello -
>
> I am trying to implement an outlier detection application on streaming data.
> I am a newbie to Spark and hence would like some advice on the confusions
> that I have ..
>
> I am thinking of using StreamingKMeans - is this a good choice ? I have one
> stream of data and I need an online algorithm. But here are some questions
> that immediately come to my mind ..
>
> 1. I cannot do separate training, cross validation etc. Is this a good idea
> to do training and prediction online ?
>
> 2. The data will be read from the stream coming from Kafka in microbatches
> of (say) 3 seconds. I get a DStream on which I train and get the clusters.
> How can I decide on the number of clusters ? Using StreamingKMeans is there
> any way I can iterate on microbatches with different values of k to find the
> optimal one ?
>
> 3. Even if I fix k, after training on every microbatch I get a DStream. How
> can I compute things like clustering score on the DStream ?
> StreamingKMeansModel has a computeCost function but it takes an RDD. I can
> use dstream.foreachRDD { // process RDD for the micro batch here } - is this
> the idiomatic way ?
>
> 4. If I use dstream.foreachRDD { .. } and use functions like new
> StandardScaler().fit(rdd) to do feature normalization, then it works when I
> have data in the stream. But when the microbatch is empty (say I don't have
> data for some time), the fit method throws exception as it gets an empty
> collection. Things start working ok when data starts coming back to the
> stream. But is this the way to go ?
>
> any suggestion will be welcome ..
>
> regards.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka direct approach,App UI shows wrong input rate

2016-11-19 Thread Cody Koeninger
There have definitely been issues with UI reporting for the direct
stream in the past, but I'm not able to reproduce this with 2.0.2 and
0.8.  See below:

https://i.imgsafe.org/086019ae57.png



On Fri, Nov 18, 2016 at 4:38 AM, Julian Keppel
 wrote:
> Hello,
>
> I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is 0.10.0.1
> (Scala 2.11). I read data from Kafka with the direct approach. The complete
> infrastructure runs on Google Container Engine.
>
> I wonder why the corresponding application UI says the input rate is zero
> records per second. This is definitely wrong. I checked it while I printed
> out the incoming records to the driver console. All other metrics seem to be
> correct (at least they are realistic).
>
> What is going on here? Do you have any idea? Thanks for you help.
>
> Julian

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
I ran your example using the versions of kafka and spark you are
using, against a standalone cluster.  This is what I observed:

(in kafka working directory)

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -2
simple_logtest:2:0
simple_logtest:4:0
simple_logtest:1:0
simple_logtest:3:0
simple_logtest:0:0

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -1
simple_logtest:2:31
simple_logtest:4:31
simple_logtest:1:31
simple_logtest:3:31
simple_logtest:0:31

So in other words, there are 5 partitions, they all have messages in them

(in spark working directory)

bash-3.2$ ./bin/spark-submit --master
spark://Codys-MacBook-Pro.local:7077 --class
example.SimpleKafkaLoggingDriver
/private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
localhost:9092 simple_logtest mygroup earliest


16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms

simple_logtest 3 offsets: 0 to 31
simple_logtest 0 offsets: 0 to 31
simple_logtest 1 offsets: 0 to 31
simple_logtest 2 offsets: 0 to 31
simple_logtest 4 offsets: 0 to 31

16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms
16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
1479574025000 ms (execution: 0.005 s)
16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
147957403 ms.0 from job set of time 147957403 ms

simple_logtest 3 offsets: 31 to 31
simple_logtest 0 offsets: 31 to 31
simple_logtest 1 offsets: 31 to 31
simple_logtest 2 offsets: 31 to 31
simple_logtest 4 offsets: 31 to 31

So in other words, spark is indeed seeing offsets for each partition.


The results you posted look to me like there aren't any messages going
into the other partitions, which looks like a misbehaving producer.

On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
 wrote:
> Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been
> struggling with this show stopper problem.
>
> When we run our drivers with auto.offset.reset=latest ingesting from a
> single kafka topic with 10 partitions, the driver reads correctly from all
> 10 partitions.
>
> However when we use auto.offset.reset=earliest, the driver will read only a
> single partition.
>
> When we turn on the debug logs, we sometimes see partitions being set to
> different offset configuration even though the consumer config correctly
> indicates auto.offset.reset=earliest.
>
>> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>> from broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>> from broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>
>
>
> I've enclosed below the completely stripped down trivial test driver that
> shows this behavior. We normally run with YARN 2.7.3 but have also tried
> running spark standalone mode which has the same behavior. Our drivers are
> normally java but we have tried the scala version which also has the same
> incorrect behavior. We have tried different LocationStrategies and partition
> assignment strategies all without success.  Any insight would be greatly
> appreciated.
>
> package com.x.labs.analytics.diagnostics.spark.drivers
>
> import org.apache.kafka.common.serialization.StringDeserializer
> import 

Re: Kafka segmentation

2016-11-19 Thread Cody Koeninger
I mean I don't understand exactly what the issue is.  Can you fill in
these blanks

My settings are :

My code is :

I expected to see :

Instead, I saw :

On Thu, Nov 17, 2016 at 12:53 PM, Hoang Bao Thien <hbthien0...@gmail.com> wrote:
> I am sorry I don't understand your idea. What do you mean exactly?
>
> On Fri, Nov 18, 2016 at 1:50 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Ok, I don't think I'm clear on the issue then.  Can you say what the
>> expected behavior is, and what the observed behavior is?
>>
>> On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien <hbthien0...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > Thanks for your comments. But in fact, I don't want to limit the size of
>> > batches, it could be any greater size as it does.
>> >
>> > Thien
>> >
>> > On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> If you want a consistent limit on the size of batches, use
>> >> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
>> >> createDirectStream)
>> >>
>> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> >>
>> >> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien
>> >> <hbthien0...@gmail.com>
>> >> wrote:
>> >> > Hi,
>> >> >
>> >> > I use CSV and other text files to Kafka just to test Kafka + Spark
>> >> > Streaming
>> >> > by using direct stream. That's why I don't want Spark streaming reads
>> >> > CSVs
>> >> > or text files directly.
>> >> > In addition, I don't want a giant batch of records like the link you
>> >> > sent.
>> >> > The problem is that we should receive the "similar" number of record
>> >> > of
>> >> > all
>> >> > batchs instead of the first two or three batches have so large number
>> >> > of
>> >> > records (e.g., 100K) but the last 1000 batches with only 200 records.
>> >> >
>> >> > I know that the problem is not from the auto.offset.reset=largest,
>> >> > but I
>> >> > don't know what I can do in this case.
>> >> >
>> >> > Do you and other ones could suggest me some solutions please as this
>> >> > seems
>> >> > the normal situation with Kafka+SpartStreaming.
>> >> >
>> >> > Thanks.
>> >> > Alex
>> >> >
>> >> >
>> >> >
>> >> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger <c...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> Yeah, if you're reporting issues, please be clear as to whether
>> >> >> backpressure is enabled, and whether maxRatePerPartition is set.
>> >> >>
>> >> >> I expect that there is something wrong with backpressure, see e.g.
>> >> >> https://issues.apache.org/jira/browse/SPARK-18371
>> >> >>
>> >> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com>
>> >> >> wrote:
>> >> >> > I hit similar issue with Spark Streaming. The batch size seemed a
>> >> >> > little
>> >> >> > random. Sometime it was large with many Kafka messages inside same
>> >> >> > batch,
>> >> >> > sometimes it was very small with just a few messages. Is it
>> >> >> > possible
>> >> >> > that
>> >> >> > was caused by the backpressure implementation in Spark Streaming?
>> >> >> >
>> >> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger
>> >> >> > <c...@koeninger.org>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Moved to user list.
>> >> >> >>
>> >> >> >> I'm not really clear on what you're trying to accomplish (why put
>> >> >> >> the
>> >> >> >> csv file through Kafka instead of reading it directly with
>> >> >> >> spark?)
>> >> >> >>
>> >> >> >> auto.offset.reset=largest just means that when starting the job
>> >> >> >> without any defined offsets, it will start at the highest 

[jira] [Comment Edited] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-19 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15679472#comment-15679472
 ] 

Cody Koeninger edited comment on SPARK-18475 at 11/19/16 4:02 PM:
--

Yes, an RDD does have an ordering guarantee, it's an iterator per partition, 
same as Kafka.  Yes, that guarantee is part of the Kafka data model (Burak, if 
you don't believe me, go reread 
http://kafka.apache.org/documentation.html#introduction  search for "order").  
Because the direct stream (and the structured stream that uses the same model) 
has a 1:1 correspondence between kafka partition and spark partition, that 
guarantee is preserved.  The existing distortions between the Kafka model and 
the direct stream / structured stream are enough as it is, we don't need to add 
more.



was (Author: c...@koeninger.org):
Yes, an RDD does have an ordering guarantee, it's an iterator per partition, 
same as Kafka.  Yes, that guarantee is part of the Kafka data model (Burak, if 
you don't believe me, go reread 
http://kafka.apache.org/documentation.html#introduction  search for "order").  
Because the direct stream (and the structured stream that uses the same model) 
has a 1:! correspondence between kafka partition and spark partition, that 
guarantee is preserved.  The existing distortions between the Kafka model and 
the direct stream / structured stream are enough as it is, we don't need to add 
more.


> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-19 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15679472#comment-15679472
 ] 

Cody Koeninger commented on SPARK-18475:


Yes, an RDD does have an ordering guarantee, it's an iterator per partition, 
same as Kafka.  Yes, that guarantee is part of the Kafka data model (Burak, if 
you don't believe me, go reread 
http://kafka.apache.org/documentation.html#introduction  search for "order").  
Because the direct stream (and the structured stream that uses the same model) 
has a 1:! correspondence between kafka partition and spark partition, that 
guarantee is preserved.  The existing distortions between the Kafka model and 
the direct stream / structured stream are enough as it is, we don't need to add 
more.


> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
Ok, I don't think I'm clear on the issue then.  Can you say what the
expected behavior is, and what the observed behavior is?

On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien <hbthien0...@gmail.com> wrote:
> Hi,
>
> Thanks for your comments. But in fact, I don't want to limit the size of
> batches, it could be any greater size as it does.
>
> Thien
>
> On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> If you want a consistent limit on the size of batches, use
>> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
>> createDirectStream)
>>
>> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>>
>> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien <hbthien0...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > I use CSV and other text files to Kafka just to test Kafka + Spark
>> > Streaming
>> > by using direct stream. That's why I don't want Spark streaming reads
>> > CSVs
>> > or text files directly.
>> > In addition, I don't want a giant batch of records like the link you
>> > sent.
>> > The problem is that we should receive the "similar" number of record of
>> > all
>> > batchs instead of the first two or three batches have so large number of
>> > records (e.g., 100K) but the last 1000 batches with only 200 records.
>> >
>> > I know that the problem is not from the auto.offset.reset=largest, but I
>> > don't know what I can do in this case.
>> >
>> > Do you and other ones could suggest me some solutions please as this
>> > seems
>> > the normal situation with Kafka+SpartStreaming.
>> >
>> > Thanks.
>> > Alex
>> >
>> >
>> >
>> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Yeah, if you're reporting issues, please be clear as to whether
>> >> backpressure is enabled, and whether maxRatePerPartition is set.
>> >>
>> >> I expect that there is something wrong with backpressure, see e.g.
>> >> https://issues.apache.org/jira/browse/SPARK-18371
>> >>
>> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com> wrote:
>> >> > I hit similar issue with Spark Streaming. The batch size seemed a
>> >> > little
>> >> > random. Sometime it was large with many Kafka messages inside same
>> >> > batch,
>> >> > sometimes it was very small with just a few messages. Is it possible
>> >> > that
>> >> > was caused by the backpressure implementation in Spark Streaming?
>> >> >
>> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <c...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> Moved to user list.
>> >> >>
>> >> >> I'm not really clear on what you're trying to accomplish (why put
>> >> >> the
>> >> >> csv file through Kafka instead of reading it directly with spark?)
>> >> >>
>> >> >> auto.offset.reset=largest just means that when starting the job
>> >> >> without any defined offsets, it will start at the highest (most
>> >> >> recent) available offsets.  That's probably not what you want if
>> >> >> you've already loaded csv lines into kafka.
>> >> >>
>> >> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
>> >> >> <hbthien0...@gmail.com>
>> >> >> wrote:
>> >> >> > Hi all,
>> >> >> >
>> >> >> > I would like to ask a question related to the size of Kafka
>> >> >> > stream. I
>> >> >> > want
>> >> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming
>> >> >> > to
>> >> >> > get
>> >> >> > the
>> >> >> > output from Kafka and then save to Hive by using SparkSQL. The
>> >> >> > file
>> >> >> > csv
>> >> >> > is
>> >> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields
>> >> >> > of
>> >> >> > integer). I see that Spark Streaming first received two
>> >> >> > partitions/batches,
>> >> >> > the first is of 60K messages and the second is of 50K msgs. But
>> >> >> > from
>> >> >> > the
>> >> >> > third batch, Spark just received 200 messages for each batch (or
>> >> >> > partition).
>> >> >> > I think that this problem is coming from Kafka or some
>> >> >> > configuration
>> >> >> > in
>> >> >> > Spark. I already tried to configure with the setting
>> >> >> > "auto.offset.reset=largest", but every batch only gets 200
>> >> >> > messages.
>> >> >> >
>> >> >> > Could you please tell me how to fix this problem?
>> >> >> > Thank you so much.
>> >> >> >
>> >> >> > Best regards,
>> >> >> > Alex
>> >> >> >
>> >> >>
>> >> >>
>> >> >> -
>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >>
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15674459#comment-15674459
 ] 

Cody Koeninger commented on SPARK-18475:


This has come up several times, and my answer is consistently the same - as 
Ofir said, the Kafka model is parallelism bounded by number of partitions.  
Breaking that model breaks user expectations, e.g. concerning ordering.  It's 
fine for you if this helps your specific use case, but I think it is not 
appropriate for general use.  I'd recommend people fix their skew and/or 
repartition at the producer level.  

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
If you want a consistent limit on the size of batches, use
spark.streaming.kafka.maxRatePerPartition  (assuming you're using
createDirectStream)

http://spark.apache.org/docs/latest/configuration.html#spark-streaming

On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien <hbthien0...@gmail.com> wrote:
> Hi,
>
> I use CSV and other text files to Kafka just to test Kafka + Spark Streaming
> by using direct stream. That's why I don't want Spark streaming reads CSVs
> or text files directly.
> In addition, I don't want a giant batch of records like the link you sent.
> The problem is that we should receive the "similar" number of record of all
> batchs instead of the first two or three batches have so large number of
> records (e.g., 100K) but the last 1000 batches with only 200 records.
>
> I know that the problem is not from the auto.offset.reset=largest, but I
> don't know what I can do in this case.
>
> Do you and other ones could suggest me some solutions please as this seems
> the normal situation with Kafka+SpartStreaming.
>
> Thanks.
> Alex
>
>
>
> On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Yeah, if you're reporting issues, please be clear as to whether
>> backpressure is enabled, and whether maxRatePerPartition is set.
>>
>> I expect that there is something wrong with backpressure, see e.g.
>> https://issues.apache.org/jira/browse/SPARK-18371
>>
>> On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com> wrote:
>> > I hit similar issue with Spark Streaming. The batch size seemed a little
>> > random. Sometime it was large with many Kafka messages inside same
>> > batch,
>> > sometimes it was very small with just a few messages. Is it possible
>> > that
>> > was caused by the backpressure implementation in Spark Streaming?
>> >
>> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Moved to user list.
>> >>
>> >> I'm not really clear on what you're trying to accomplish (why put the
>> >> csv file through Kafka instead of reading it directly with spark?)
>> >>
>> >> auto.offset.reset=largest just means that when starting the job
>> >> without any defined offsets, it will start at the highest (most
>> >> recent) available offsets.  That's probably not what you want if
>> >> you've already loaded csv lines into kafka.
>> >>
>> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
>> >> <hbthien0...@gmail.com>
>> >> wrote:
>> >> > Hi all,
>> >> >
>> >> > I would like to ask a question related to the size of Kafka stream. I
>> >> > want
>> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to
>> >> > get
>> >> > the
>> >> > output from Kafka and then save to Hive by using SparkSQL. The file
>> >> > csv
>> >> > is
>> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
>> >> > integer). I see that Spark Streaming first received two
>> >> > partitions/batches,
>> >> > the first is of 60K messages and the second is of 50K msgs. But from
>> >> > the
>> >> > third batch, Spark just received 200 messages for each batch (or
>> >> > partition).
>> >> > I think that this problem is coming from Kafka or some configuration
>> >> > in
>> >> > Spark. I already tried to configure with the setting
>> >> > "auto.offset.reset=largest", but every batch only gets 200 messages.
>> >> >
>> >> > Could you please tell me how to fix this problem?
>> >> > Thank you so much.
>> >> >
>> >> > Best regards,
>> >> > Alex
>> >> >
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-16 Thread Cody Koeninger
Yeah, if you're reporting issues, please be clear as to whether
backpressure is enabled, and whether maxRatePerPartition is set.

I expect that there is something wrong with backpressure, see e.g.
https://issues.apache.org/jira/browse/SPARK-18371

On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com> wrote:
> I hit similar issue with Spark Streaming. The batch size seemed a little
> random. Sometime it was large with many Kafka messages inside same batch,
> sometimes it was very small with just a few messages. Is it possible that
> was caused by the backpressure implementation in Spark Streaming?
>
> On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Moved to user list.
>>
>> I'm not really clear on what you're trying to accomplish (why put the
>> csv file through Kafka instead of reading it directly with spark?)
>>
>> auto.offset.reset=largest just means that when starting the job
>> without any defined offsets, it will start at the highest (most
>> recent) available offsets.  That's probably not what you want if
>> you've already loaded csv lines into kafka.
>>
>> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien <hbthien0...@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > I would like to ask a question related to the size of Kafka stream. I
>> > want
>> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to get
>> > the
>> > output from Kafka and then save to Hive by using SparkSQL. The file csv
>> > is
>> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
>> > integer). I see that Spark Streaming first received two
>> > partitions/batches,
>> > the first is of 60K messages and the second is of 50K msgs. But from the
>> > third batch, Spark just received 200 messages for each batch (or
>> > partition).
>> > I think that this problem is coming from Kafka or some configuration in
>> > Spark. I already tried to configure with the setting
>> > "auto.offset.reset=largest", but every batch only gets 200 messages.
>> >
>> > Could you please tell me how to fix this problem?
>> > Thank you so much.
>> >
>> > Best regards,
>> > Alex
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-16 Thread Cody Koeninger
Moved to user list.

I'm not really clear on what you're trying to accomplish (why put the
csv file through Kafka instead of reading it directly with spark?)

auto.offset.reset=largest just means that when starting the job
without any defined offsets, it will start at the highest (most
recent) available offsets.  That's probably not what you want if
you've already loaded csv lines into kafka.

On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien  wrote:
> Hi all,
>
> I would like to ask a question related to the size of Kafka stream. I want
> to put data (e.g., file *.csv) to Kafka then use Spark streaming to get the
> output from Kafka and then save to Hive by using SparkSQL. The file csv is
> about 100MB with ~250K messages/rows (Each row has about 10 fields of
> integer). I see that Spark Streaming first received two partitions/batches,
> the first is of 60K messages and the second is of 50K msgs. But from the
> third batch, Spark just received 200 messages for each batch (or partition).
> I think that this problem is coming from Kafka or some configuration in
> Spark. I already tried to configure with the setting
> "auto.offset.reset=largest", but every batch only gets 200 messages.
>
> Could you please tell me how to fix this problem?
> Thank you so much.
>
> Best regards,
> Alex
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: using Spark Streaming with Kafka 0.9/0.10

2016-11-15 Thread Cody Koeninger
Generating / defining an RDDis not the same thing as running the
compute() method of an rdd .  The direct stream definitely runs kafka
consumers on the executors.

If you want more info, the blog post and video linked from
https://github.com/koeninger/kafka-exactly-once refers to the 0.8
implementation, but the general design is similar for the 0.10
version.

I think the likelihood of an official release supporting 0.9 is fairly
slim at this point, it's a year out of date and wouldn't be a drop-in
dependency change.


On Tue, Nov 15, 2016 at 5:50 PM, aakash aakash <email2aak...@gmail.com> wrote:
>
>
>> You can use the 0.8 artifact to consume from a 0.9 broker
>
> We are currently using "Camus" in production and one of the main goal to
> move to Spark is to use new Kafka Consumer API  of Kafka 0.9 and in our case
> we need the security provisions available in 0.9, that why we cannot use 0.8
> client.
>
>> Where are you reading documentation indicating that the direct stream
> only runs on the driver?
>
> I might be wrong here, but I see that new kafka+Spark stream code extend the
> InputStream and its documentation says : Input streams that can generate
> RDDs from new data by running a service/thread only on the driver node (that
> is, without running a receiver on worker nodes)
>
> Thanks and regards,
> Aakash Pradeep
>
>
> On Tue, Nov 15, 2016 at 2:55 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> It'd probably be worth no longer marking the 0.8 interface as
>> experimental.  I don't think it's likely to be subject to active
>> development at this point.
>>
>> You can use the 0.8 artifact to consume from a 0.9 broker
>>
>> Where are you reading documentation indicating that the direct stream
>> only runs on the driver?  It runs consumers on the worker nodes.
>>
>>
>> On Tue, Nov 15, 2016 at 10:58 AM, aakash aakash <email2aak...@gmail.com>
>> wrote:
>> > Re-posting it at dev group.
>> >
>> > Thanks and Regards,
>> > Aakash
>> >
>> >
>> > -- Forwarded message --
>> > From: aakash aakash <email2aak...@gmail.com>
>> > Date: Mon, Nov 14, 2016 at 4:10 PM
>> > Subject: using Spark Streaming with Kafka 0.9/0.10
>> > To: user-subscr...@spark.apache.org
>> >
>> >
>> > Hi,
>> >
>> > I am planning to use Spark Streaming to consume messages from Kafka 0.9.
>> > I
>> > have couple of questions regarding this :
>> >
>> > I see APIs are annotated with @Experimental. So can you please tell me
>> > when
>> > are we planning to make it production ready ?
>> > Currently, I see we are using Kafka 0.10 and so curious to know why not
>> > we
>> > started with 0.9 Kafka instead of 0.10 Kafka. As I see 0.10 kafka client
>> > would not be compatible with 0.9 client since there are some changes in
>> > arguments in consumer API.
>> > Current API extends InputDstream and as per document it means RDD will
>> > be
>> > generated by running a service/thread only on the driver node instead of
>> > worker node. Can you please explain to me why we are doing this and what
>> > is
>> > required to make sure that it runs on worker node.
>> >
>> >
>> > Thanks in advance !
>> >
>> > Regards,
>> > Aakash
>> >
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: using Spark Streaming with Kafka 0.9/0.10

2016-11-15 Thread Cody Koeninger
It'd probably be worth no longer marking the 0.8 interface as
experimental.  I don't think it's likely to be subject to active
development at this point.

You can use the 0.8 artifact to consume from a 0.9 broker

Where are you reading documentation indicating that the direct stream
only runs on the driver?  It runs consumers on the worker nodes.


On Tue, Nov 15, 2016 at 10:58 AM, aakash aakash  wrote:
> Re-posting it at dev group.
>
> Thanks and Regards,
> Aakash
>
>
> -- Forwarded message --
> From: aakash aakash 
> Date: Mon, Nov 14, 2016 at 4:10 PM
> Subject: using Spark Streaming with Kafka 0.9/0.10
> To: user-subscr...@spark.apache.org
>
>
> Hi,
>
> I am planning to use Spark Streaming to consume messages from Kafka 0.9. I
> have couple of questions regarding this :
>
> I see APIs are annotated with @Experimental. So can you please tell me when
> are we planning to make it production ready ?
> Currently, I see we are using Kafka 0.10 and so curious to know why not we
> started with 0.9 Kafka instead of 0.10 Kafka. As I see 0.10 kafka client
> would not be compatible with 0.9 client since there are some changes in
> arguments in consumer API.
> Current API extends InputDstream and as per document it means RDD will be
> generated by running a service/thread only on the driver node instead of
> worker node. Can you please explain to me why we are doing this and what is
> required to make sure that it runs on worker node.
>
>
> Thanks in advance !
>
> Regards,
> Aakash
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-13 Thread Cody Koeninger
Preferred locations are only advisory, you can still get tasks scheduled on
other executors.  You can try bumping up the size of the cache to see if
that is causing the issue you're seeing.

On Nov 13, 2016 12:47, "Ivan von Nagy" <i...@vadio.com> wrote:

> As the code iterates through the parallel list, it is processing up to 8
> KafkaRDD at a time. Each has it's own unique topic and consumer group now.
> Every topic has 4 partitions, so technically there should never be more
> then 32 CachedKafkaConsumers. However, this seems to not be the case as we
> are using the default settings for cache size (16 initial -> 64 max) and
> PreferConsistent for the location strategy. I do notice the concurrent
> modification exception occurs when a cached consumer is being dropped out
> of the cache when it reaches the max, 64. After looking at the code, the
> KafkaRDDIterator will only close it's consumer if we are not caching (makes
> sense), but there is no other way to close/drop the consumer until it gets
> dropped from the cache. Perhaps there is an issue with resources here since
> RDDs don't inherently have any resource management support, like "I am done
> so cleanup now".
>
> Over the course of this job, it will probably process upwards of 100-150
> different channels so about 400-600 partitions. Does this mean we should
> bump the cache size that high even though only about 8 channels (32
> partitions) are being handled by the executors at any given time?
>
> Thanks,
>
> Ivan
>
> On Sat, Nov 12, 2016 at 1:25 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> You should not be getting consumer churn on executors at all, that's
>> the whole point of the cache.  How many partitions are you trying to
>> process per executor?
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-int
>> egration.html#locationstrategies
>>
>> gives instructions on the default size of the cache and how to increase
>> it.
>>
>> On Sat, Nov 12, 2016 at 2:15 PM, Ivan von Nagy <i...@vadio.com> wrote:
>> > Hi Sean,
>> >
>> > Thanks for responding. We have run our jobs with internal parallel
>> > processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and
>> did
>> > not encounter any of these issues until upgrading to Spark 2.0.1 and
>> Kafka
>> > 0.10 clients. If we process serially, then we sometimes get the errors,
>> but
>> > far less often. Also, if done sequentially it takes sometimes more the
>> 2x as
>> > long which is not an option for this particular job.
>> >
>> > I posted another example on Nov 10th which is the example below. We
>> > basically iterate through a list in parallel and sometimes the list
>> could be
>> > upwards of a hundred elements. The parallelism in Scala/Spark limits to
>> > about 8 at a time on our nodes. For performance reasons we process in
>> > parallel and we also separate each since each channel has their own
>> topic.
>> > We don't combine all into one KafkaRDD because that means we have to
>> process
>> > all or nothing if an error occurs. This way if a couple of channels
>> fail, we
>> > can re-run the job and it will only process those channels.
>> >
>> > This has just been perplexing since we had never encountered any errors
>> for
>> > well over a year using the prior versions. At this time, I am just
>> seeking
>> > any configuration options or code changes that we may be missing or
>> even at
>> > a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that
>> > surfaced these issues.
>> >
>> > We continue to use Spark 1.6 with the Kafka 0.8.x clients until this
>> can be
>> > figured out, however, it is a deal breaker for use to upgrade to Spark
>> 2.x
>> > with Kafka 0.10 clients. On a side note, we have not encountered any
>> issues
>> > with the Kafka Producers, this is simply with the KafkaRDD and its use
>> of
>> > CachedKafkaConsumer. Any help is much appreciated.
>> >
>> > Thanks,
>> >
>> > Ivan
>> >
>> > Example usage with KafkaRDD:
>> > val channels = Seq("channel1", "channel2")
>> >
>> > channels.toParArray.foreach { channel =>
>> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >
>> >   // Get offsets for the given topic and the consumer group
>> "$prefix-$topic"
>> >   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>> &g

Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread Cody Koeninger
Can you come up with a minimal reproducible example?

Probably unrelated, but why are you doing a union of 3 streams?

On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote:
> There are no failures or errors.  Irrespective of that I am seeing
> duplicates. The steps and stages are all successful and even the speculation
> is turned off .
>
> On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Are you certain you aren't getting any failed tasks or other errors?
>> Output actions like foreach aren't exactly once and will be retried on
>> failures.
>>
>>
>> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:
>>>
>>> Dear fellow Spark Users,
>>>
>>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>>> listens to Campaigns based on live stock feeds and the batch duration is 5
>>> seconds. The applications uses Kafka DirectStream and based on the feed
>>> source there are three streams. As given in the code snippet I am doing a
>>> union of three streams and I am trying to remove the duplicate campaigns
>>> received using reduceByKey based on the customer and campaignId. I could see
>>> lot of duplicate email being send out for the same key in the same batch.I
>>> was expecting reduceByKey to remove the duplicate campaigns in a batch based
>>> on customer and campaignId. In logs I am even printing the the key,batch
>>> time before sending the email and I could clearly see duplicates. I could
>>> see some duplicates getting removed after adding log in reduceByKey
>>> Function, but its not eliminating completely .
>>>
>>> JavaDStream matchedCampaigns =
>>> stream1.transform(CmpManager::getMatchedCampaigns)
>>> .union(stream2).union(stream3).cache();
>>>
>>> JavaPairDStream<String, Campaign> uniqueCampaigns =
>>> matchedCampaigns.mapToPair(campaign->{
>>> String key=campaign.getCustomer()+"_"+campaign.getId();
>>> return new Tuple2<String, Campaigns>(key, campaign);
>>> })
>>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>>>
>>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>>
>>> I am not able to figure out where I am going wrong here . Please help me
>>> here to get rid of this weird problem. Previously we were using createStream
>>> for listening to Kafka Queue (number of partitions 1) , there we didn't face
>>> this issue. But when we moved to directStream (number of partitions 100) we
>>> could easily reproduce this issue on high load .
>>>
>>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>>> instead of reduceByKey Operation, But even that didn't
>>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>>> Durations.Seconds(5))
>>>
>>> I have even requested for help on Stackoverflow , But I haven't received
>>> any solutions to this issue.
>>>
>>> Stack Overflow Link
>>> 
>>>
>>> https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>>
>>>
>>> Thanks and Regards
>>> Dev
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-12 Thread Cody Koeninger
et a list of channels, iterate
>> through them in parallel, load a KafkaRDD using a given topic and a consumer
>> group that is made from the topic (each RDD uses a different topic and
>> group), process the data and write to Parquet files.
>>
>> Per my Nov 10th post, we still get polling timeouts unless the poll.ms is
>> set to something like 10 seconds. We also get concurrent modification
>> exceptions as well. I believe the key here is the processing of data in
>> parallel is where we encounter issues so we are looking for some possible
>> answers surrounding this.
>>
>> Thanks,
>>
>> Ivan
>>
>>
>> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> It is already documented that you must use a different group id, which as
>>> far as I can tell you are still not doing.
>>>
>>>
>>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
>>> wrote:
>>>>
>>>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>>>>
>>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote:
>>>>>
>>>>> Ok, I have split he KafkaRDD logic to each use their own group and
>>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on the
>>>>> poll.ms ends up with a timeout and exception so I am still perplexed on 
>>>>> that
>>>>> one. The new error I am getting now is a `ConcurrentModificationException`
>>>>> when Spark is trying to remove the CachedKafkaConsumer.
>>>>>
>>>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>>>> for multi-threaded access
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
>>>>> at
>>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>>>>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>>>>
>>>>> Here is the basic logic:
>>>>>
>>>>> Using KafkaRDD - This takes a list of channels and processes them in
>>>>> parallel using the KafkaRDD directly. They each use a distinct consumer
>>>>> group (s"$prefix-$topic"), and each has it's own topic and each topic has 
>>>>> 4
>>>>> partitions. We routinely get timeout errors when polling for data when the
>>>>> poll.ms is less then 2 seconds. This occurs whether we process in 
>>>>> parallel.
>>>>>
>>>>> Example usage with KafkaRDD:
>>>>> val channels = Seq("channel1", "channel2")
>>>>>
>>>>> channels.toParArray.foreach { channel =>
>>>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>>
>>>>>   // Get offsets for the given topic and the consumer group
>>>>> "$prefix-$topic"
>>>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>>>>
>>>>>   val ds = KafkaUtils.createRDD[K, V](context,
>>>>> kafkaParams asJava,
>>>>> offsetRanges,
>>>>> PreferConsistent).toDS[V]
>>>>>
>>>>>   // Do some aggregations
>>>>>   ds.agg(...)
>>>>>   // Save the data
>>>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>>   // Save offsets using a KafkaConsumer
>>>>>   consumer.commitSync(newOffsets.asJava)
>>>>>   consumer.close()
>>>>> }
>>>>>
>>>>> I am not sure why the concurrent issue is there as I have tried to
>>>>> debug and also looked at the KafkaConsumer code as well, but everything
>>>>> looks like it should not occur. The things to figure out is why when 
>>>>> running
>>>>> in parallel does this occur and also why the timeouts still occur.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Ivan
>>>>>
>>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>>
>>>>>> There definitely is

Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread Cody Koeninger
Are you certain you aren't getting any failed tasks or other errors?
Output actions like foreach aren't exactly once and will be retried on
failures.

On Nov 12, 2016 06:36, "dev loper"  wrote:

> Dear fellow Spark Users,
>
> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
> listens to Campaigns based on live stock feeds and the batch duration is 5
> seconds. The applications uses Kafka DirectStream and based on the feed
> source there are three streams. As given in the code snippet I am doing a
> union of three streams and I am trying to remove the duplicate campaigns
> received using reduceByKey based on the customer and campaignId. I could
> see lot of duplicate email being send out for the same key in the same
> batch.I was expecting reduceByKey to remove the duplicate campaigns in a
> batch based on customer and campaignId. In logs I am even printing the the
> key,batch time before sending the email and I could clearly see duplicates.
> I could see some duplicates getting removed after adding log in reduceByKey
> Function, but its not eliminating completely .
>
> JavaDStream matchedCampaigns = 
> stream1.transform(CmpManager::getMatchedCampaigns)
> .union(stream2).union(stream3).cache();
> JavaPairDStream uniqueCampaigns = 
> matchedCampaigns.mapToPair(campaign->{
> String key=campaign.getCustomer()+"_"+campaign.getId();
> return new Tuple2(key, campaign);
> }).reduceByKey((campaign1, campaign2)->{return campaign1;});
>
> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>
> I am not able to figure out where I am going wrong here . Please help me
> here to get rid of this weird problem. Previously we were using
> createStream for listening to Kafka Queue (number of partitions 1) , there
> we didn't face this issue. But when we moved to directStream (number of
> partitions 100) we could easily reproduce this issue on high load .
>
> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
> instead of reduceByKey Operation, But even that didn't help.
> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
> Durations.Seconds(5))
> I have even requested for help on Stackoverflow , But I haven't received
> any solutions to this issue.
>
>
> *Stack Overflow Link*
> https://stackoverflow.com/questions/40559858/spark-
> streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>
>
> Thanks and Regards
> Dev
>


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-11 Thread Cody Koeninger
It is already documented that you must use a different group id, which as
far as I can tell you are still not doing.

On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
wrote:

> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>
> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote:
>
>> Ok, I have split he KafkaRDD logic to each use their own group and bumped
>> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
>> ends up with a timeout and exception so I am still perplexed on that one.
>> The new error I am getting now is a `ConcurrentModificationException`
>> when Spark is trying to remove the CachedKafkaConsumer.
>>
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>> onsumer.java:1361)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano
>> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>
>> Here is the basic logic:
>>
>> *Using KafkaRDD* - This takes a list of channels and processes them in
>> parallel using the KafkaRDD directly. They each use a distinct consumer
>> group (s"$prefix-$topic"), and each has it's own topic and each topic
>> has 4 partitions. We routinely get timeout errors when polling for data
>> when the poll.ms is less then 2 seconds. This occurs whether we process
>> in parallel.
>>
>> *Example usage with KafkaRDD:*
>> val channels = Seq("channel1", "channel2")
>>
>> channels.toParArray.foreach { channel =>
>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>
>>   // Get offsets for the given topic and the consumer group "$prefix-$
>> topic"
>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>
>>   val ds = KafkaUtils.createRDD[K, V](context,
>> kafkaParams asJava,
>> offsetRanges,
>> PreferConsistent).toDS[V]
>>
>>   // Do some aggregations
>>   ds.agg(...)
>>   // Save the data
>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>   // Save offsets using a KafkaConsumer
>>   consumer.commitSync(newOffsets.asJava)
>>   consumer.close()
>> }
>>
>> I am not sure why the concurrent issue is there as I have tried to debug
>> and also looked at the KafkaConsumer code as well, but everything looks
>> like it should not occur. The things to figure out is why when running in
>> parallel does this occur and also why the timeouts still occur.
>>
>> Thanks,
>>
>> Ivan
>>
>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> There definitely is Kafka documentation indicating that you should use
>>> a different consumer group for logically different subscribers, this
>>> is really basic to Kafka:
>>>
>>> http://kafka.apache.org/documentation#intro_consumers
>>>
>>> As for your comment that "commit async after each RDD, which is not
>>> really viable also", how is it not viable?  Again, committing offsets
>>> to Kafka doesn't give you reliable delivery semantics unless your
>>> downstream data store is idempotent.  If your downstream data store is
>>> idempotent, then it shouldn't matter to you when offset commits
>>> happen, as long as they happen within a reasonable time after the data
>>> is written.
>>>
>>> Do you want to keep arguing with me, or follow my advice and proceed
>>> with debugging any remaining issues after you make the changes I
>>> suggested?
>>>
>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote:
>>> > With our stream version, we update the offsets for only the partition
>>> we
>>> > operating on. We even break down the partition into smaller batches
>>> and then
>>> > update the offsets after each batch within the partition. With Spark
>>> 1.6 and
>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>> update
>>> > the offsets for a given consumer group. You have to subscribe or assign
>>> > partitions to even do so.
>>>

[jira] [Commented] (SPARK-18386) Batch mode SQL source for Kafka

2016-11-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654308#comment-15654308
 ] 

Cody Koeninger commented on SPARK-18386:


That should work.  There may be dependency conflicts trying to put a 0.10.1 jar 
in the same job as a 0.10.0, though.

> Batch mode SQL source for Kafka
> ---
>
> Key: SPARK-18386
> URL: https://issues.apache.org/jira/browse/SPARK-18386
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>    Reporter: Cody Koeninger
>
> An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for 
> querying over a defined batch of offsets.
> The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X 
> to timestamp Y) should be taken into account, even if not available in the 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0

2016-11-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15654295#comment-15654295
 ] 

Cody Koeninger commented on SPARK-18057:


I definitely do not want another copy-paste situation, we've already got too 
many of them.

I'm hoping that 0.10.1 is close enough to 0.10.0 that dependency issues can be 
worked out in a more satisfactory way (e.g. kafka is marked as provided, the 
0.10.1 integration jar depends on the 0.10 integration jar and just adds 
methods for time indexing) but I haven't really had time to look at it.

> Update structured streaming kafka from 10.0.1 to 10.1.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-10 Thread Cody Koeninger
The basic structured streaming source for Kafka is already committed to
master, build it and try it out.

If you're already using Kafka I don't really see much point in trying to
put Akka in between it and Spark.

On Nov 10, 2016 02:25, "vincent gromakowski" 
wrote:

I have already integrated common actors. I am also interested, specially to
see how we can achieve end to end back pressure.

2016-11-10 8:46 GMT+01:00 shyla deshpande :

> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
>


Re: Connectors using new Kafka consumer API

2016-11-09 Thread Cody Koeninger
Ok... in general it seems to me like effort would be better spent
trying to help upstream, as opposed to us making a 5th slightly
different interface to kafka (currently have 0.8 receiver, 0.8
dstream, 0.10 dstream, 0.10 structured stream)

On Tue, Nov 8, 2016 at 10:05 PM, Mark Grover <m...@apache.org> wrote:
> I think they are open to others helping, in fact, more than one person has
> worked on the JIRA so far. And, it's been crawling really slowly and that's
> preventing adoption of Spark's new connector in secure Kafka environments.
>
> On Tue, Nov 8, 2016 at 7:59 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Have you asked the assignee on the Kafka jira whether they'd be
>> willing to accept help on it?
>>
>> On Tue, Nov 8, 2016 at 5:26 PM, Mark Grover <m...@apache.org> wrote:
>> > Hi all,
>> > We currently have a new direct stream connector, thanks to work by Cody
>> > and
>> > others on SPARK-12177.
>> >
>> > However, that can't be used in secure clusters that require Kerberos
>> > authentication. That's because Kafka currently doesn't support
>> > delegation
>> > tokens (KAFKA-1696). Unfortunately, very little work has been done on
>> > that
>> > JIRA, so, in my opinion, folks who want to use secure Kafka (using the
>> > norm
>> > - Kerberos) can't do so because Spark Streaming can't consume from it
>> > today.
>> >
>> > The right way is, of course, to get delegation tokens in Kafka but
>> > honestly
>> > I don't know if that's happening in the near future. I am wondering if
>> > we
>> > should consider something to remedy this - for example, we could come up
>> > with a receiver based connector based on the new Kafka consumer API
>> > that'd
>> > support kerberos authentication. It won't require delegation tokens
>> > since
>> > there's only a very small number of executors talking to Kafka. Of
>> > course,
>> > for anyone who cares about high throughput and other direct connector
>> > benefits would have to use direct connector. Another thing we could do
>> > is
>> > ship the keytab to the executors in the direct connector, so delegation
>> > tokens are not required but the latter would be a pretty comprising
>> > solution, and I'd prefer not doing that.
>> >
>> > What do folks think? Would love to hear your thoughts, especially about
>> > the
>> > receiver.
>> >
>> > Thanks!
>> > Mark
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[jira] [Created] (SPARK-18386) Batch mode SQL source for Kafka

2016-11-09 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18386:
--

 Summary: Batch mode SQL source for Kafka
 Key: SPARK-18386
 URL: https://issues.apache.org/jira/browse/SPARK-18386
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Reporter: Cody Koeninger


An SQL equivalent to the DStream KafkaUtils.createRDD would be useful for 
querying over a defined batch of offsets.

The possibility of Kafka 0.10.1 time indexing (e.g. a batch from timestamp X to 
timestamp Y) should be taken into account, even if not available in the initial 
implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Connectors using new Kafka consumer API

2016-11-08 Thread Cody Koeninger
Have you asked the assignee on the Kafka jira whether they'd be
willing to accept help on it?

On Tue, Nov 8, 2016 at 5:26 PM, Mark Grover  wrote:
> Hi all,
> We currently have a new direct stream connector, thanks to work by Cody and
> others on SPARK-12177.
>
> However, that can't be used in secure clusters that require Kerberos
> authentication. That's because Kafka currently doesn't support delegation
> tokens (KAFKA-1696). Unfortunately, very little work has been done on that
> JIRA, so, in my opinion, folks who want to use secure Kafka (using the norm
> - Kerberos) can't do so because Spark Streaming can't consume from it today.
>
> The right way is, of course, to get delegation tokens in Kafka but honestly
> I don't know if that's happening in the near future. I am wondering if we
> should consider something to remedy this - for example, we could come up
> with a receiver based connector based on the new Kafka consumer API that'd
> support kerberos authentication. It won't require delegation tokens since
> there's only a very small number of executors talking to Kafka. Of course,
> for anyone who cares about high throughput and other direct connector
> benefits would have to use direct connector. Another thing we could do is
> ship the keytab to the executors in the direct connector, so delegation
> tokens are not required but the latter would be a pretty comprising
> solution, and I'd prefer not doing that.
>
> What do folks think? Would love to hear your thoughts, especially about the
> receiver.
>
> Thanks!
> Mark

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-18371) Spark Streaming backpressure bug - generates a batch with large number of records

2016-11-08 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649638#comment-15649638
 ] 

Cody Koeninger commented on SPARK-18371:


Thanks for digging into this.  The other thing I noticed when working on

https://github.com/apache/spark/pull/15132

is that the return value of getLatestRate was cast to Int, which seems wrong 
and possibly subject to overflow.

If you have the ability to test that PR (shouldn't require a spark redeploy, 
since the kafka jar is standalone), may want to test it out.

> Spark Streaming backpressure bug - generates a batch with large number of 
> records
> -
>
> Key: SPARK-18371
> URL: https://issues.apache.org/jira/browse/SPARK-18371
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0
>Reporter: mapreduced
> Attachments: GiantBatch2.png, GiantBatch3.png, 
> Giant_batch_at_23_00.png, Look_at_batch_at_22_14.png
>
>
> When the streaming job is configured with backpressureEnabled=true, it 
> generates a GIANT batch of records if the processing time + scheduled delay 
> is (much) larger than batchDuration. This creates a backlog of records like 
> no other and results in batches queueing for hours until it chews through 
> this giant batch.
> Expectation is that it should reduce the number of records per batch in some 
> time to whatever it can really process.
> Attaching some screen shots where it seems that this issue is quite easily 
> reproducible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Spark Improvement Proposals

2016-11-08 Thread Cody Koeninger
So there are some minor things (the Where section heading appears to
be dropped; wherever this document is posted it needs to actually link
to a jira filter showing current / past SIPs) but it doesn't look like
I can comment on the google doc.

The major substantive issue that I have is that this version is
significantly less clear as to the outcome of an SIP.

The apache example of lazy consensus at
http://apache.org/foundation/voting.html#LazyConsensus involves an
explicit announcement of an explicit deadline, which I think are
necessary for clarity.



On Mon, Nov 7, 2016 at 1:55 PM, Reynold Xin <r...@databricks.com> wrote:
> It turned out suggested edits (trackable) don't show up for non-owners, so
> I've just merged all the edits in place. It should be visible now.
>
> On Mon, Nov 7, 2016 at 10:10 AM, Reynold Xin <r...@databricks.com> wrote:
>>
>> Oops. Let me try figure that out.
>>
>>
>> On Monday, November 7, 2016, Cody Koeninger <c...@koeninger.org> wrote:
>>>
>>> Thanks for picking up on this.
>>>
>>> Maybe I fail at google docs, but I can't see any edits on the document
>>> you linked.
>>>
>>> Regarding lazy consensus, if the board in general has less of an issue
>>> with that, sure.  As long as it is clearly announced, lasts at least
>>> 72 hours, and has a clear outcome.
>>>
>>> The other points are hard to comment on without being able to see the
>>> text in question.
>>>
>>>
>>> On Mon, Nov 7, 2016 at 3:11 AM, Reynold Xin <r...@databricks.com> wrote:
>>> > I just looked through the entire thread again tonight - there are a lot
>>> > of
>>> > great ideas being discussed. Thanks Cody for taking the first crack at
>>> > the
>>> > proposal.
>>> >
>>> > I want to first comment on the context. Spark is one of the most
>>> > innovative
>>> > and important projects in (big) data -- overall technical decisions
>>> > made in
>>> > Apache Spark are sound. But of course, a project as large and active as
>>> > Spark always have room for improvement, and we as a community should
>>> > strive
>>> > to take it to the next level.
>>> >
>>> > To that end, the two biggest areas for improvements in my opinion are:
>>> >
>>> > 1. Visibility: There are so much happening that it is difficult to know
>>> > what
>>> > really is going on. For people that don't follow closely, it is
>>> > difficult to
>>> > know what the important initiatives are. Even for people that do
>>> > follow, it
>>> > is difficult to know what specific things require their attention,
>>> > since the
>>> > number of pull requests and JIRA tickets are high and it's difficult to
>>> > extract signal from noise.
>>> >
>>> > 2. Solicit user (broadly defined, including developers themselves)
>>> > input
>>> > more proactively: At the end of the day the project provides value
>>> > because
>>> > users use it. Users can't tell us exactly what to build, but it is
>>> > important
>>> > to get their inputs.
>>> >
>>> >
>>> > I've taken Cody's doc and edited it:
>>> >
>>> > https://docs.google.com/document/d/1-Zdi_W-wtuxS9hTK0P9qb2x-nRanvXmnZ7SUi4qMljg/edit#heading=h.36ut37zh7w2b
>>> > (I've made all my modifications trackable)
>>> >
>>> > There are couple high level changes I made:
>>> >
>>> > 1. I've consulted a board member and he recommended lazy consensus as
>>> > opposed to voting. The reason being in voting there can easily be a
>>> > "loser'
>>> > that gets outvoted.
>>> >
>>> > 2. I made it lighter weight, and renamed "strategy" to "optional design
>>> > sketch". Echoing one of the earlier email: "IMHO so far aside from
>>> > tagging
>>> > things and linking them elsewhere simply having design docs and
>>> > prototypes
>>> > implementations in PRs is not something that has not worked so far".
>>> >
>>> > 3. I made some the language tweaks to focus more on visibility. For
>>> > example,
>>> > "The purpose of an SIP is to inform and involve", rather than just
>>> > "involve". SIPs should also have at least two emails that go to dev@.
>>> >
>>> >
>>> > Wh

Re: Kafka stream offset management question

2016-11-08 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

specifically

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets

Have you set enable.auto.commit to false?

The new consumer stores offsets in kafka, so the idea of specifically
deleting offsets for that group doesn't really make sense.

In other words

- set enable.auto.commit to false
- use a new group.id


On Tue, Nov 8, 2016 at 2:21 AM, Haopu Wang  wrote:
> I'm using Kafka direct stream (auto.offset.reset = earliest) and enable
> Spark streaming's checkpoint.
>
>
>
> The application starts and consumes messages correctly. Then I stop the
> application and clean the checkpoint folder.
>
>
>
> I restart the application and expect it to consumes old messages. But it
> doesn't consume any data. And there are logs as below:
>
>
>
>  [org.apache.spark.streaming.kafka010.KafkaRDD] (Executor task
> launch worker-0;) Beginning offset 25 is the same as ending offset skipping
> aa 0
>
>
>
> So I think the offset is stored not only in checkpoint but also in Kafka,
> right?
>
> Is it because I'm using the same group.id? How can I delete the consumer
> group manually?
>
>
>
> Thanks again for any help!
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-07 Thread Cody Koeninger
I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
 if  // it's a header
parser = someParserBasedOn(line)
else
   items += parser.parse(line)
 }
 items.iterator
}

On Mon, Nov 7, 2016 at 4:22 PM, coolgar  wrote:
> I'm using apache spark streaming with the kafka direct consumer. The data
> stream I'm receiving is log data that includes a header with each block of
> messages. Each DStream can therefore have many blocks of messages, each with
> it's own header.
>
> The header is used to know how to interpret the following fields in the
> block of messages. My challenge is that I'm building up (K,V) pairs that are
> processed by reduceByKey() and I use this header to know how to parse the
> fields that follow the header into the (K,V) pairs.
>
> So each message received by kakfa may appear as follows (# denotes the
> header field, \n denotes new line):
> #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
> field6 field7\data4 data5 data6 data7\n...
>
> Is there a way, without collecting all data back to the driver, to "grab"
> the header and use it to subsequently process the messages that follow the
> header until a new #fields comes along, rinse, repeat?
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Cody Koeninger
There definitely is Kafka documentation indicating that you should use
a different consumer group for logically different subscribers, this
is really basic to Kafka:

http://kafka.apache.org/documentation#intro_consumers

As for your comment that "commit async after each RDD, which is not
really viable also", how is it not viable?  Again, committing offsets
to Kafka doesn't give you reliable delivery semantics unless your
downstream data store is idempotent.  If your downstream data store is
idempotent, then it shouldn't matter to you when offset commits
happen, as long as they happen within a reasonable time after the data
is written.

Do you want to keep arguing with me, or follow my advice and proceed
with debugging any remaining issues after you make the changes I
suggested?

On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote:
> With our stream version, we update the offsets for only the partition we
> operating on. We even break down the partition into smaller batches and then
> update the offsets after each batch within the partition. With Spark 1.6 and
> Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
> necessarily a Spark issue since Kafka no longer allows you to simply update
> the offsets for a given consumer group. You have to subscribe or assign
> partitions to even do so.
>
> As for storing the offsets in some other place like a DB, it don't find this
> useful because you then can't use tools like Kafka Manager. In order to do
> so you would have to store in a DB and the circle back and update Kafka
> afterwards. This means you have to keep two sources in sync which is not
> really a good idea.
>
> It is a challenge in Spark to use the Kafka offsets since the drive keeps
> subscribed to the topic(s) and consumer group, while the executors prepend
> "spark-executor-" to the consumer group. The stream (driver) does allow you
> to commit async after each RDD, which is not really viable also. I have not
> of implementing an Akka actor system on the driver and send it messages from
> the executor code to update the offsets, but then that is asynchronous as
> well so not really a good solution.
>
> I have no idea why Kafka made this change and also why in the parallel
> KafkaRDD application we would be advised to use different consumer groups
> for each RDD. That seems strange to me that different consumer groups would
> be required or advised. There is no Kafka documentation that I know if that
> states this. The biggest issue I see with the parallel KafkaRDD is the
> timeouts. I have tried to set poll.ms to 30 seconds and still get the issue.
> Something is not right here and just not seem right. As I mentioned with the
> streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
> issue. We have been running the same basic logic for over a year now without
> one hitch at all.
>
> Ivan
>
>
> On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Someone can correct me, but I'm pretty sure Spark dstreams (in
>> general, not just kafka) have been progressing on to the next batch
>> after a given batch aborts for quite some time now.  Yet another
>> reason I put offsets in my database transactionally.  My jobs throw
>> exceptions if the offset in the DB isn't what I expected it to be.
>>
>>
>>
>>
>> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com> wrote:
>> > I've been encountering the same kinds of timeout issues as Ivan, using
>> > the "Kafka Stream" approach that he is using, except I'm storing my offsets
>> > manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
>> > implemented the KafkaRDD approach, and therefore don't have the concurrency
>> > issues, but a very similar use case is coming up for me soon, it's just 
>> > been
>> > backburnered until I can get streaming to be more reliable (I will
>> > definitely ensure unique group IDs when I do). Offset commits are certainly
>> > more painful in Kafka 0.10, and that doesn't have anything to do with 
>> > Spark.
>> >
>> > While i may be able to alleviate the timeout by just increasing it, I've
>> > noticed something else that is more worrying: When one task fails 4 times 
>> > in
>> > a row (i.e. "Failed to get records for _ after polling for _"), Spark 
>> > aborts
>> > the Stage and Job with "Job aborted due to stage failure: Task _ in stage _
>> > failed 4 times". That's fine, and it's the behavior I want, but instead of
>> > stopping the Application there (as previous versions of Spark did) the next

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-07 Thread Cody Koeninger
Someone can correct me, but I'm pretty sure Spark dstreams (in
general, not just kafka) have been progressing on to the next batch
after a given batch aborts for quite some time now.  Yet another
reason I put offsets in my database transactionally.  My jobs throw
exceptions if the offset in the DB isn't what I expected it to be.




On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com> wrote:
> I've been encountering the same kinds of timeout issues as Ivan, using the 
> "Kafka Stream" approach that he is using, except I'm storing my offsets 
> manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet 
> implemented the KafkaRDD approach, and therefore don't have the concurrency 
> issues, but a very similar use case is coming up for me soon, it's just been 
> backburnered until I can get streaming to be more reliable (I will definitely 
> ensure unique group IDs when I do). Offset commits are certainly more painful 
> in Kafka 0.10, and that doesn't have anything to do with Spark.
>
> While i may be able to alleviate the timeout by just increasing it, I've 
> noticed something else that is more worrying: When one task fails 4 times in 
> a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts 
> the Stage and Job with "Job aborted due to stage failure: Task _ in stage _ 
> failed 4 times". That's fine, and it's the behavior I want, but instead of 
> stopping the Application there (as previous versions of Spark did) the next 
> microbatch marches on and offsets are committed ahead of the failed 
> microbatch. Suddenly my at-least-once app becomes more 
> sometimes-at-least-once which is no good. In order for spark to display that 
> failure, I must be propagating the errors up to Spark, but the behavior of 
> marching forward with the next microbatch seems to be new, and a big 
> potential for data loss in streaming applications.
>
> Am I perhaps missing a setting to stop the entire streaming application once 
> spark.task.maxFailures is reached? Has anyone else seen this behavior of a 
> streaming application skipping over failed microbatches?
>
> Thanks,
> Sean
>
>
>> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> So basically what I am saying is
>>
>> - increase poll.ms
>> - use a separate group id everywhere
>> - stop committing offsets under the covers
>>
>> That should eliminate all of those as possible causes, and then we can
>> see if there are still issues.
>>
>> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> subscribe to a topic in order to update offsets, Kafka does.  If you
>> don't like the new Kafka consumer api, the existing 0.8 simple
>> consumer api should be usable with later brokers.  As long as you
>> don't need SSL or dynamic subscriptions, and it meets your needs, keep
>> using it.
>>
>> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com> wrote:
>>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
>>> single distinct topic. For example, the group would be something like
>>> "storage-group", and the topics would be "storage-channel1", and
>>> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
>>> partitions assigned, and then commit offsets are called after the RDD is
>>> processed. This should not interfere with the consumer group used by the
>>> executors which would be "spark-executor-storage-group".
>>>
>>> In the streaming example there is a single topic ("client-events") and group
>>> ("processing-group"). A single stream is created and offsets are manually
>>> updated from the executor after each partition is handled. This was a
>>> challenge since Spark now requires one to assign or subscribe to a topic in
>>> order to even update the offsets. In 0.8.2.x you did not have to worry about
>>> that. This approach limits your exposure to duplicate data since idempotent
>>> records are not entirely possible in our scenario. At least without a lot of
>>> re-running of logic to de-dup.
>>>
>>> Thanks,
>>>
>>> Ivan
>>>
>>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>>>
>>>> So just to be clear, the answers to my questions are
>>>>
>>>> - you are not using different group ids, you're using the same group
>>>> id everywhere
>>>>
>>>> - you are committing offsets manually
>>>>
>>>> R

Re: Odp.: Spark Improvement Proposals

2016-11-07 Thread Cody Koeninger
Thanks for picking up on this.

Maybe I fail at google docs, but I can't see any edits on the document
you linked.

Regarding lazy consensus, if the board in general has less of an issue
with that, sure.  As long as it is clearly announced, lasts at least
72 hours, and has a clear outcome.

The other points are hard to comment on without being able to see the
text in question.


On Mon, Nov 7, 2016 at 3:11 AM, Reynold Xin <r...@databricks.com> wrote:
> I just looked through the entire thread again tonight - there are a lot of
> great ideas being discussed. Thanks Cody for taking the first crack at the
> proposal.
>
> I want to first comment on the context. Spark is one of the most innovative
> and important projects in (big) data -- overall technical decisions made in
> Apache Spark are sound. But of course, a project as large and active as
> Spark always have room for improvement, and we as a community should strive
> to take it to the next level.
>
> To that end, the two biggest areas for improvements in my opinion are:
>
> 1. Visibility: There are so much happening that it is difficult to know what
> really is going on. For people that don't follow closely, it is difficult to
> know what the important initiatives are. Even for people that do follow, it
> is difficult to know what specific things require their attention, since the
> number of pull requests and JIRA tickets are high and it's difficult to
> extract signal from noise.
>
> 2. Solicit user (broadly defined, including developers themselves) input
> more proactively: At the end of the day the project provides value because
> users use it. Users can't tell us exactly what to build, but it is important
> to get their inputs.
>
>
> I've taken Cody's doc and edited it:
> https://docs.google.com/document/d/1-Zdi_W-wtuxS9hTK0P9qb2x-nRanvXmnZ7SUi4qMljg/edit#heading=h.36ut37zh7w2b
> (I've made all my modifications trackable)
>
> There are couple high level changes I made:
>
> 1. I've consulted a board member and he recommended lazy consensus as
> opposed to voting. The reason being in voting there can easily be a "loser'
> that gets outvoted.
>
> 2. I made it lighter weight, and renamed "strategy" to "optional design
> sketch". Echoing one of the earlier email: "IMHO so far aside from tagging
> things and linking them elsewhere simply having design docs and prototypes
> implementations in PRs is not something that has not worked so far".
>
> 3. I made some the language tweaks to focus more on visibility. For example,
> "The purpose of an SIP is to inform and involve", rather than just
> "involve". SIPs should also have at least two emails that go to dev@.
>
>
> While I was editing this, I thought we really needed a suggested template
> for design doc too. I will get to that too ...
>
>
> On Tue, Nov 1, 2016 at 12:09 AM, Reynold Xin <r...@databricks.com> wrote:
>>
>> Most things looked OK to me too, although I do plan to take a closer look
>> after Nov 1st when we cut the release branch for 2.1.
>>
>>
>> On Mon, Oct 31, 2016 at 3:12 PM, Marcelo Vanzin <van...@cloudera.com>
>> wrote:
>>>
>>> The proposal looks OK to me. I assume, even though it's not explicitly
>>> called, that voting would happen by e-mail? A template for the
>>> proposal document (instead of just a bullet nice) would also be nice,
>>> but that can be done at any time.
>>>
>>> BTW, shameless plug: I filed SPARK-18085 which I consider a candidate
>>> for a SIP, given the scope of the work. The document attached even
>>> somewhat matches the proposed format. So if anyone wants to try out
>>> the process...
>>>
>>> On Mon, Oct 31, 2016 at 10:34 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> > Now that spark summit europe is over, are any committers interested in
>>> > moving forward with this?
>>> >
>>> >
>>> > https://github.com/koeninger/spark-1/blob/SIP-0/docs/spark-improvement-proposals.md
>>> >
>>> > Or are we going to let this discussion die on the vine?
>>> >
>>> > On Mon, Oct 17, 2016 at 10:05 AM, Tomasz Gawęda
>>> > <tomasz.gaw...@outlook.com> wrote:
>>> >> Maybe my mail was not clear enough.
>>> >>
>>> >>
>>> >> I didn't want to write "lets focus on Flink" or any other framework.
>>> >> The
>>> >> idea with benchmarks was to show two things:
>>> >>
>>> >> - why some people are doing bad PR for Spark
>>> >>
>>> >

Re: expected behavior of Kafka dynamic topic subscription

2016-11-07 Thread Cody Koeninger
https://issues.apache.org/jira/browse/SPARK-18272

I couldn't speculate on what the issue might be without more info.  If
you have time to write a test for that ticket, I'd encourage you to do
so, I'm not certain how soon I'll be able to get to it.

On Sun, Nov 6, 2016 at 7:31 PM, Haopu Wang <hw...@qilinsoft.com> wrote:
> Cody, thanks for the response. Do you think it's a Spark issue or Kafka 
> issue? Can you please let me know the jira ticket number?
>
> -Original Message-
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 2016年11月4日 22:35
> To: Haopu Wang
> Cc: user@spark.apache.org
> Subject: Re: expected behavior of Kafka dynamic topic subscription
>
> That's not what I would expect from the underlying kafka consumer, no.
>
> But this particular case (no matching topics, then add a topic after
> SubscribePattern stream starts) actually isn't part of unit tests for
> either the DStream or the structured stream.
>
> I'll make a jira ticket.
>
> On Thu, Nov 3, 2016 at 9:43 PM, Haopu Wang <hw...@qilinsoft.com> wrote:
>> I'm using Kafka010 integration API to create a DStream using
>> SubscriberPattern ConsumerStrategy.
>>
>> The specified topic doesn't exist when I start the application.
>>
>> Then I create the topic and publish some test messages. I can see them in
>> the console subscriber.
>>
>> But the spark application doesn't seem to get the messages.
>>
>> I think this is not expected, right? What should I check to resolve it?
>>
>> Thank you very much!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15638555#comment-15638555
 ] 

Cody Koeninger commented on SPARK-18258:


Sure, added, let me know if I'm missing something or can clarify.

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for 
> this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
> OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using 
> committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back 
> to the correct source when restarting complicated multi-source jobs, but I 
> think it'd be sufficient.  Passing the string/json representation of the seq 
> instead of the seq itself would probably be sufficient as well, but the 
> convention of rendering a None as "-" in the json is maybe a little 
> idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18258:
---
Description: 
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the 
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.

After SPARK-17829 is complete and offsets have a .json method, an api for this 
ticket might look like

{code}
trait Sink {
  def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
OffsetSeq): Unit
{code}

where start and end were provided by StreamExecution.runBatch using 
committedOffsets and availableOffsets.  

I'm not 100% certain that the offsets in the seq could always be mapped back to 
the correct source when restarting complicated multi-source jobs, but I think 
it'd be sufficient.  Passing the string/json representation of the seq instead 
of the seq itself would probably be sufficient as well, but the convention of 
rendering a None as "-" in the json is maybe a little idiosyncratic to parse, 
and the constant defining that is private.

  was:
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the 
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.


> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for 
> this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
> OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using 
> committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back 
> to the correct source when restarting complicated multi-source jobs, but I 
> think it'd be sufficient.  Passing the string/json representation of the seq 
> instead of the seq itself would probably be sufficient as well, but the 
> convention of rendering a None as "-&

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
So basically what I am saying is

- increase poll.ms
- use a separate group id everywhere
- stop committing offsets under the covers

That should eliminate all of those as possible causes, and then we can
see if there are still issues.

As far as 0.8 vs 0.10, Spark doesn't require you to assign or
subscribe to a topic in order to update offsets, Kafka does.  If you
don't like the new Kafka consumer api, the existing 0.8 simple
consumer api should be usable with later brokers.  As long as you
don't need SSL or dynamic subscriptions, and it meets your needs, keep
using it.

On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com> wrote:
> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
> single distinct topic. For example, the group would be something like
> "storage-group", and the topics would be "storage-channel1", and
> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
> partitions assigned, and then commit offsets are called after the RDD is
> processed. This should not interfere with the consumer group used by the
> executors which would be "spark-executor-storage-group".
>
> In the streaming example there is a single topic ("client-events") and group
> ("processing-group"). A single stream is created and offsets are manually
> updated from the executor after each partition is handled. This was a
> challenge since Spark now requires one to assign or subscribe to a topic in
> order to even update the offsets. In 0.8.2.x you did not have to worry about
> that. This approach limits your exposure to duplicate data since idempotent
> records are not entirely possible in our scenario. At least without a lot of
> re-running of logic to de-dup.
>
> Thanks,
>
> Ivan
>
> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> So just to be clear, the answers to my questions are
>>
>> - you are not using different group ids, you're using the same group
>> id everywhere
>>
>> - you are committing offsets manually
>>
>> Right?
>>
>> If you want to eliminate network or kafka misbehavior as a source,
>> tune poll.ms upwards even higher.
>>
>> You must use different group ids for different rdds or streams.
>> Kafka consumers won't behave the way you expect if they are all in the
>> same group id, and the consumer cache is keyed by group id. Yes, the
>> executor will tack "spark-executor-" on to the beginning, but if you
>> give it the same base group id, it will be the same.  And the driver
>> will use the group id you gave it, unmodified.
>>
>> Finally, I really can't help you if you're manually writing your own
>> code to commit offsets directly to Kafka.  Trying to minimize
>> duplicates that way doesn't really make sense, your system must be
>> able to handle duplicates if you're using kafka as an offsets store,
>> it can't do transactional exactly once.
>>
>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> wrote:
>> > Here are some examples and details of the scenarios. The KafkaRDD is the
>> > most
>> > error prone to polling
>> > timeouts and concurrentm modification errors.
>> >
>> > *Using KafkaRDD* - This takes a list of channels and processes them in
>> > parallel using the KafkaRDD directly. they all use the same consumer
>> > group
>> > ('storage-group'), but each has it's own topic and each topic has 4
>> > partitions. We routinely get timeout errors when polling for data. This
>> > occurs whether we process in parallel or sequentially.
>> >
>> > *Spark Kafka setting:*
>> > spark.streaming.kafka.consumer.poll.ms=2000
>> >
>> > *Kafka Consumer Params:*
>> > metric.reporters = []
>> > metadata.max.age.ms = 30
>> > partition.assignment.strategy =
>> > [org.apache.kafka.clients.consumer.RangeAssignor]
>> > reconnect.backoff.ms = 50
>> > sasl.kerberos.ticket.renew.window.factor = 0.8
>> > max.partition.fetch.bytes = 1048576
>> > bootstrap.servers = [somemachine:31000]
>> > ssl.keystore.type = JKS
>> > enable.auto.commit = false
>> > sasl.mechanism = GSSAPI
>> > interceptor.classes = null
>> > exclude.internal.topics = true
>> > ssl.truststore.password = null
>> > client.id =
>> > ssl.endpoint.identification.algorithm = null
>> > max.poll.records = 1000
>> > check.crcs = true
>> > request.timeout.ms = 4
>> > heartbeat.interval.ms = 3000
>> > auto.commit.int

[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637621#comment-15637621
 ] 

Cody Koeninger commented on SPARK-18258:


So one obvious one is that if wherever checkpoint data is being stored fails or 
is corrupted, my downstream database can still be fine and have correct 
results, yet I have no way of restarting the job from a known point because the 
batch id stored in the database is now meaningless.

Basically, I do not want to introduce another N points of failure in between 
Kafka and my data store.

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

2016-11-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637576#comment-15637576
 ] 

Cody Koeninger commented on SPARK-18258:


The sink doesn't have to reason about equality of the representations.

It just has to be able to store those representations, in addition the batch id 
if necessary, so that the job can be recovered if spark fails in a way that 
renders the batch id meaningless or the user wants to switch to a different 
streaming system.

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>    Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
So just to be clear, the answers to my questions are

- you are not using different group ids, you're using the same group
id everywhere

- you are committing offsets manually

Right?

If you want to eliminate network or kafka misbehavior as a source,
tune poll.ms upwards even higher.

You must use different group ids for different rdds or streams.
Kafka consumers won't behave the way you expect if they are all in the
same group id, and the consumer cache is keyed by group id. Yes, the
executor will tack "spark-executor-" on to the beginning, but if you
give it the same base group id, it will be the same.  And the driver
will use the group id you gave it, unmodified.

Finally, I really can't help you if you're manually writing your own
code to commit offsets directly to Kafka.  Trying to minimize
duplicates that way doesn't really make sense, your system must be
able to handle duplicates if you're using kafka as an offsets store,
it can't do transactional exactly once.

On Fri, Nov 4, 2016 at 1:48 PM, vonnagy  wrote:
> Here are some examples and details of the scenarios. The KafkaRDD is the most
> error prone to polling
> timeouts and concurrentm modification errors.
>
> *Using KafkaRDD* - This takes a list of channels and processes them in
> parallel using the KafkaRDD directly. they all use the same consumer group
> ('storage-group'), but each has it's own topic and each topic has 4
> partitions. We routinely get timeout errors when polling for data. This
> occurs whether we process in parallel or sequentially.
>
> *Spark Kafka setting:*
> spark.streaming.kafka.consumer.poll.ms=2000
>
> *Kafka Consumer Params:*
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [somemachine:31000]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id =
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 1000
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> group.id = storage-group
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> session.timeout.ms = 3
> metrics.num.samples = 2
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> auto.offset.reset = earliest
>
> *Example usage with KafkaRDD:*
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group 'storage-group'
>   val offsetRanges = getOffsets("storage-group", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
> kafkaParams asJava,
> offsetRanges,
> PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
>
> *Example usage with Kafka Stream:*
> This creates a stream and processes events in each partition. At the end of
> processing for
> each partition, we updated the offsets for each partition. This is
> challenging to do, but is better
> then calling commitAysnc on the stream, because that occurs after the
> /entire/ RDD has been
> processed. This method minimizes duplicates in an exactly once environment.
> Since the executors
> use their own custom group "spark-executor-processor-group" and the commit
> is buried in private
> functions we are unable to use the executors cached consumer to update the
> offsets. This requires us
> to go through multiple steps to update the Kafka offsets accordingly.
>
> val offsetRanges = getOffsets("processor-group", "my-topic")
>
> val stream = KafkaUtils.createDirectStream[K, V](context,
>   PreferConsistent,
>   Subscribe[K, V](Seq("my-topic") 

Anyone want to weigh in on a Kafka DStreams api change?

2016-11-04 Thread Cody Koeninger
SPARK-17510

https://github.com/apache/spark/pull/15132

It's for allowing tweaking of rate limiting on a per-partition basis

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Continuous warning while consuming using new kafka-spark010 API

2016-11-04 Thread Cody Koeninger
I answered the duplicate post on the user mailing list, I'd say keep
the discussion there.

On Fri, Nov 4, 2016 at 12:14 PM, vonnagy  wrote:
> Nitin,
>
> I am getting the similar issues using Spark 2.0.1 and Kafka 0.10. I have to
> jobs, one that uses a Kafka stream and one that uses just the KafkaRDD.
>
> With the KafkaRDD, I continually get the "Failed to get records". I have
> adjusted the polling with `spark.streaming.kafka.consumer.poll.ms` and the
> size of records with Kafka's `max.poll.records`. Even when it gets records
> it is extremely slow.
>
> When working with multiple KafkaRDDs in parallel I get the dreaded
> `ConcurrentModificationException`. The Spark logic is supposed to use a
> CachedKafkaConsumer based on the topic and partition. This is supposed to
> guarantee thread safety, but I continually get this error along with the
> polling timeout.
>
> Has anyone else tried to use Spark 2 with Kafka 0.10 and had any success. At
> this point it is completely useless in my experience. With Spark 1.6 and
> Kafka 0.8.x, I never had these problems.
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Continuous-warning-while-consuming-using-new-kafka-spark010-API-tp18987p19736.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
- are you using different group ids for the different streams?
- are you manually committing offsets?
- what are the values of your kafka-related settings?

On Fri, Nov 4, 2016 at 12:20 PM, vonnagy  wrote:
> I am getting the issues using Spark 2.0.1 and Kafka 0.10. I have two jobs,
> one that uses a Kafka stream and one that uses just the KafkaRDD.
>
> With the KafkaRDD, I continually get the "Failed to get records .. after
> polling". I have adjusted the polling with
> `spark.streaming.kafka.consumer.poll.ms` and the size of records with
> Kafka's `max.poll.records`. Even when it gets records it is extremely slow.
>
> When working with multiple KafkaRDDs in parallel I get the dreaded
> `ConcurrentModificationException`. The Spark logic is supposed to use a
> CachedKafkaConsumer based on the topic and partition. This is supposed to
> guarantee thread safety, but I continually get this error along with the
> polling timeout.
>
> Has anyone else tried to use Spark 2 with Kafka 0.10 and had any success. At
> this point it is completely useless in my experience. With Spark 1.6 and
> Kafka 0.8.x, I never had these problems.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Created] (SPARK-18272) Test topic addition for subscribePattern on Kafka DStream and Structured Stream

2016-11-04 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18272:
--

 Summary: Test topic addition for subscribePattern on Kafka DStream 
and Structured Stream
 Key: SPARK-18272
 URL: https://issues.apache.org/jira/browse/SPARK-18272
 Project: Spark
  Issue Type: Bug
  Components: DStreams, Structured Streaming
Reporter: Cody Koeninger


We've had reports of the following sequence

- create subscribePattern stream that doesn't match any existing topics at the 
time stream starts
- add a topic that matches pattern
- expect that messages from that topic show up, but they don't

We don't seem to actually have tests that cover this case, so we should add them



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: expected behavior of Kafka dynamic topic subscription

2016-11-04 Thread Cody Koeninger
That's not what I would expect from the underlying kafka consumer, no.

But this particular case (no matching topics, then add a topic after
SubscribePattern stream starts) actually isn't part of unit tests for
either the DStream or the structured stream.

I'll make a jira ticket.

On Thu, Nov 3, 2016 at 9:43 PM, Haopu Wang  wrote:
> I'm using Kafka010 integration API to create a DStream using
> SubscriberPattern ConsumerStrategy.
>
> The specified topic doesn't exist when I start the application.
>
> Then I create the topic and publish some test messages. I can see them in
> the console subscriber.
>
> But the spark application doesn't seem to get the messages.
>
> I think this is not expected, right? What should I check to resolve it?
>
> Thank you very much!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Updated] (SPARK-18258) Sinks need access to offset representation

2016-11-03 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18258:
---
Description: 
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the 
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.

  was:
Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.


> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18258) Sinks need access to offset representation

2016-11-03 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-18258:
--

 Summary: Sinks need access to offset representation
 Key: SPARK-18258
 URL: https://issues.apache.org/jira/browse/SPARK-18258
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Reporter: Cody Koeninger


Transactional "exactly-once" semantics for output require storing an offset 
identifier in the same transaction as results.

The Sink.addBatch method currently only has access to batchId and data, not the 
actual offset representation.

I want to store the actual offsets, so that they are recoverable as long as the
results are and I'm not locked in to a particular streaming engine.

I could see this being accomplished by adding parameters to Sink.addBatch for 
the starting and ending offsets (either the offsets themselves, or the 
SPARK-17829 string/json representation).  That would be an API change, but if 
there's another way to map batch ids to offset representations without changing 
the Sink api that would work as well.  

I'm assuming we don't need the same level of access to offsets throughout a job 
as e.g. the Kafka dstream gives, because Sinks are the main place that should 
need them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17938) Backpressure rate not adjusting

2016-11-02 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629332#comment-15629332
 ] 

Cody Koeninger commented on SPARK-17938:


Direct stream isn't a receiver, receiver settings don't apply to it.

> Backpressure rate not adjusting
> ---
>
> Key: SPARK-17938
> URL: https://issues.apache.org/jira/browse/SPARK-17938
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Samy Dindane
>
> spark-streaming 2.0.1 and spark-streaming-kafka-0-10 version is 2.0.1. Same 
> behavior with 2.0.0 though.
> spark.streaming.kafka.consumer.poll.ms is set to 3
> spark.streaming.kafka.maxRatePerPartition is set to 10
> spark.streaming.backpressure.enabled is set to true
> `batchDuration` of the streaming context is set to 1 second.
> I consume a Kafka topic using KafkaUtils.createDirectStream().
> My system can handle 100k records batches, but it'd take more than 1 seconds 
> to process them all. I'd thus expect the backpressure to reduce the number of 
> records that would be fetched in the next batch to keep the processing delay 
> inferior to 1 second.
> Only this does not happen and the rate of the backpressure stays the same: 
> stuck in `100.0`, no matter how the other variables change (processing time, 
> error, etc.).
> Here's a log showing how all these variables change but the chosen rate stays 
> the same: https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba (I 
> would have attached a file but I don't see how).
> Is this the expected behavior and I am missing something, or is this  a bug?
> I'll gladly help by providing more information or writing code if necessary.
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Handling questions in the mailing lists

2016-11-02 Thread Cody Koeninger
So concrete things people could do

- users could tag subject lines appropriately to the component they're
asking about

- contributors could monitor user@ for tags relating to components
they've worked on.
I'd be surprised if my miss rate for any mailing list questions
well-labeled as Kafka was higher than 5%

- committers could be more aggressive about soliciting and merging PRs
to improve documentation.
It's a lot easier to answer even poorly-asked questions with a link to
relevant docs.

On Wed, Nov 2, 2016 at 7:39 AM, Sean Owen  wrote:
> There's already reviews@ and issues@. dev@ is for project development itself
> and I think is OK. You're suggesting splitting up user@ and I sympathize
> with the motivation. Experience tells me that we'll have a beginner@ that's
> then totally ignored, and people will quickly learn to post to advanced@ to
> get attention, and we'll be back where we started. Putting it in JIRA
> doesn't help. I don't think this a problem that is merely down to lack of
> process. It actually requires cultivating a culture change on the community
> list.
>
> On Wed, Nov 2, 2016 at 12:11 PM Mendelson, Assaf 
> wrote:
>>
>> What I am suggesting is basically to fix that.
>>
>> For example, we might say that mailing list A is only for voting, mailing
>> list B is only for PR and have something like stack overflow for developer
>> questions (I would even go as far as to have beginner, intermediate and
>> advanced mailing list for users and beginner/advanced for dev).
>>
>>
>>
>> This can easily be done using stack overflow tags, however, that would
>> probably be harder to manage.
>>
>> Maybe using special jira tags and manage it in jira?
>>
>>
>>
>> Anyway as I said, the main issue is not user questions (except maybe
>> advanced ones) but more for dev questions. It is so easy to get lost in the
>> chatter that it makes it very hard for people to learn spark internals…
>>
>> Assaf.
>>
>>
>>
>> From: Sean Owen [mailto:so...@cloudera.com]
>> Sent: Wednesday, November 02, 2016 2:07 PM
>> To: Mendelson, Assaf; dev@spark.apache.org
>> Subject: Re: Handling questions in the mailing lists
>>
>>
>>
>> I think that unfortunately mailing lists don't scale well. This one has
>> thousands of subscribers with different interests and levels of experience.
>> For any given person, most messages will be irrelevant. I also find that a
>> lot of questions on user@ are not well-asked, aren't an SSCCE
>> (http://sscce.org/), not something most people are going to bother replying
>> to even if they could answer. I almost entirely ignore user@ because there
>> are higher-priority channels like PRs to deal with, that already have
>> hundreds of messages per day. This is why little of it gets an answer -- too
>> noisy.
>>
>>
>>
>> We have to have official mailing lists, in any event, to have some
>> official channel for things like votes and announcements. It's not wrong to
>> ask questions on user@ of course, but a lot of the questions I see could
>> have been answered with research of existing docs or looking at the code. I
>> think that given the scale of the list, it's not wrong to assert that this
>> is sort of a prerequisite for asking thousands of people to answer one's
>> question. But we can't enforce that.
>>
>>
>>
>> The situation will get better to the extent people ask better questions,
>> help other people ask better questions, and answer good questions. I'd
>> encourage anyone feeling this way to try to help along those dimensions.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 2, 2016 at 11:32 AM assaf.mendelson 
>> wrote:
>>
>> Hi,
>>
>> I know this is a little off topic but I wanted to raise an issue about
>> handling questions in the mailing list (this is true both for the user
>> mailing list and the dev but since there are other options such as stack
>> overflow for user questions, this is more problematic in dev).
>>
>> Let’s say I ask a question (as I recently did). Unfortunately this was
>> during spark summit in Europe so probably people were busy. In any case no
>> one answered.
>>
>> The problem is, that if no one answers very soon, the question will almost
>> certainly remain unanswered because new messages will simply drown it.
>>
>>
>>
>> This is a common issue not just for questions but for any comment or idea
>> which is not immediately picked up.
>>
>>
>>
>> I believe we should have a method of handling this.
>>
>> Generally, I would say these types of things belong in stack overflow,
>> after all, the way it is built is perfect for this. More seasoned spark
>> contributors and committers can periodically check out unanswered questions
>> and answer them.
>>
>> The problem is that stack overflow (as well as other targets such as the
>> databricks forums) tend to have a more user based orientation. This means
>> that any spark internal question will almost certainly remain unanswered.
>>
>>
>>
>> I was wondering 

[jira] [Commented] (SPARK-18212) Flaky test: org.apache.spark.sql.kafka010.KafkaSourceSuite.assign from specific offsets

2016-11-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15627838#comment-15627838
 ] 

Cody Koeninger commented on SPARK-18212:


So here's a heavily excerpted version of what I see happening in that log:

{code}
16/11/01 14:08:46.593 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaTestUtils:   Sent 34 to partition 2, offset 3
16/11/01 14:08:46.593 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaProducer: Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.
16/11/01 14:08:46.596 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaTestUtils: Created consumer to get latest offsets


16/11/01 14:08:47.833 Executor task launch worker-2 ERROR Executor: Exception 
in task 1.0 in stage 29.0 (TID 142)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-kafka-source-a9485cc4-c83d-4e97-a20e-3960565b3fdb-335403166-execut\
or topic-5-2 3 after polling for 512


16/11/01 14:08:49.252 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaTestUtils: Closed consumer to get latest offsets
16/11/01 14:08:49.252 pool-1-thread-1-ScalaTest-running-KafkaSourceSuite INFO 
KafkaSourceSuite: Added data, expected offset [(topic-5-0,4), (topic-5-1,4), 
(topic-5-2,4), (topic-5-3,4), (topic-5-4,4)]
{code}


We're waiting on the producer's send future for up to 10 seconds; it takes 
almost 3 seconds between when the producer send finishes and the consumer 
that's being used to verify the post-send offsets finishes; but in the meantime 
we're only waiting half a second for executor fetches.

It's really ugly, but probably the easiest way to make this less flaky is to 
increase the value of kafkaConsumer.pollTimeoutMs to the same order of 
magnitude being used for the other test waits.

[~zsxwing] unless you see anything else wrong in the log or have a better idea, 
I can put in a pr tomorrow to increase that poll timeout in tests.


> Flaky test: org.apache.spark.sql.kafka010.KafkaSourceSuite.assign from 
> specific offsets
> ---
>
> Key: SPARK-18212
> URL: https://issues.apache.org/jira/browse/SPARK-18212
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Reporter: Davies Liu
>
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.3/1968/testReport/junit/org.apache.spark.sql.kafka010/KafkaSourceSuite/assign_from_specific_offsets/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Does Data pipeline using kafka and structured streaming work?

2016-11-01 Thread Cody Koeninger
One thing you should be aware of (that's a showstopper for my use
cases, but may not be for yours) is that you can provide Kafka offsets
to start from, but you can't really get access to offsets and metadata
during the job on a per-batch or per-partition basis, just on a
per-message basis.

On Tue, Nov 1, 2016 at 8:29 PM, Michael Armbrust <mich...@databricks.com> wrote:
> Yeah, those are all requests for additional features / version support.
> I've been using kafka with structured streaming to do both ETL into
> partitioned parquet tables as well as streaming event time windowed
> aggregation for several weeks now.
>
> On Tue, Nov 1, 2016 at 6:18 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Look at the resolved subtasks attached to that ticket you linked.
>> Some of them are unresolved, but basic functionality is there.
>>
>> On Tue, Nov 1, 2016 at 7:37 PM, shyla deshpande
>> <deshpandesh...@gmail.com> wrote:
>> > Hi Michael,
>> >
>> > Thanks for the reply.
>> >
>> > The following link says there is a open unresolved Jira for Structured
>> > streaming support for consuming from Kafka.
>> >
>> > https://issues.apache.org/jira/browse/SPARK-15406
>> >
>> > Appreciate your help.
>> >
>> > -Shyla
>> >
>> >
>> > On Tue, Nov 1, 2016 at 5:19 PM, Michael Armbrust
>> > <mich...@databricks.com>
>> > wrote:
>> >>
>> >> I'm not aware of any open issues against the kafka source for
>> >> structured
>> >> streaming.
>> >>
>> >> On Tue, Nov 1, 2016 at 4:45 PM, shyla deshpande
>> >> <deshpandesh...@gmail.com>
>> >> wrote:
>> >>>
>> >>> I am building a data pipeline using Kafka, Spark streaming and
>> >>> Cassandra.
>> >>> Wondering if the issues with  Kafka source fixed in Spark 2.0.1. If
>> >>> not,
>> >>> please give me an update on when it may be fixed.
>> >>>
>> >>> Thanks
>> >>> -Shyla
>> >>
>> >>
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does Data pipeline using kafka and structured streaming work?

2016-11-01 Thread Cody Koeninger
Look at the resolved subtasks attached to that ticket you linked.
Some of them are unresolved, but basic functionality is there.

On Tue, Nov 1, 2016 at 7:37 PM, shyla deshpande
 wrote:
> Hi Michael,
>
> Thanks for the reply.
>
> The following link says there is a open unresolved Jira for Structured
> streaming support for consuming from Kafka.
>
> https://issues.apache.org/jira/browse/SPARK-15406
>
> Appreciate your help.
>
> -Shyla
>
>
> On Tue, Nov 1, 2016 at 5:19 PM, Michael Armbrust 
> wrote:
>>
>> I'm not aware of any open issues against the kafka source for structured
>> streaming.
>>
>> On Tue, Nov 1, 2016 at 4:45 PM, shyla deshpande 
>> wrote:
>>>
>>> I am building a data pipeline using Kafka, Spark streaming and Cassandra.
>>> Wondering if the issues with  Kafka source fixed in Spark 2.0.1. If not,
>>> please give me an update on when it may be fixed.
>>>
>>> Thanks
>>> -Shyla
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module

2016-11-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15626774#comment-15626774
 ] 

Cody Koeninger commented on SPARK-17935:


Some other things to think about:
- are there any producer configurations you don't want to support?
- specifically, are you only going to support byte array serializers for 
writing key and value?
- if you're only supporting byte array, how do you clearly document for users 
how to handle their common use case (i.e. I have one string column I want to be 
the key, and the others columns should be json name/value pairs in the message)

> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> --
>
> Key: SPARK-17935
> URL: https://issues.apache.org/jira/browse/SPARK-17935
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add 
> `KafkaForeachWriter` to output results to kafka in structured streaming 
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: JIRA Components for Streaming

2016-10-31 Thread Cody Koeninger
Makes sense to me.

I do wonder if e.g.

[SPARK-12345][STRUCTUREDSTREAMING][KAFKA]

is going to leave any room in the Github PR form for actual title content?

On Mon, Oct 31, 2016 at 1:37 PM, Michael Armbrust
 wrote:
> I'm planning to do a little maintenance on JIRA to hopefully improve the
> visibility into the progress / gaps in Structured Streaming.  In particular,
> while we share a lot of optimization / execution logic with SQL, the set of
> desired features and bugs is fairly different.
>
> Proposal:
>   - Structured Streaming (new component, move existing tickets here)
>   - Streaming -> DStreams
>
> Thoughts, objections?
>
> Michael

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Odp.: Spark Improvement Proposals

2016-10-31 Thread Cody Koeninger
Now that spark summit europe is over, are any committers interested in
moving forward with this?

https://github.com/koeninger/spark-1/blob/SIP-0/docs/spark-improvement-proposals.md

Or are we going to let this discussion die on the vine?

On Mon, Oct 17, 2016 at 10:05 AM, Tomasz Gawęda
<tomasz.gaw...@outlook.com> wrote:
> Maybe my mail was not clear enough.
>
>
> I didn't want to write "lets focus on Flink" or any other framework. The
> idea with benchmarks was to show two things:
>
> - why some people are doing bad PR for Spark
>
> - how - in easy way - we can change it and show that Spark is still on the
> top
>
>
> No more, no less. Benchmarks will be helpful, but I don't think they're the
> most important thing in Spark :) On the Spark main page there is still chart
> "Spark vs Hadoop". It is important to show that framework is not the same
> Spark with other API, but much faster and optimized, comparable or even
> faster than other frameworks.
>
>
> About real-time streaming, I think it would be just good to see it in Spark.
> I very like current Spark model, but many voices that says "we need more" -
> community should listen also them and try to help them. With SIPs it would
> be easier, I've just posted this example as "thing that may be changed with
> SIP".
>
>
> I very like unification via Datasets, but there is a lot of algorithms
> inside - let's make easy API, but with strong background (articles,
> benchmarks, descriptions, etc) that shows that Spark is still modern
> framework.
>
>
> Maybe now my intention will be clearer :) As I said organizational ideas
> were already mentioned and I agree with them, my mail was just to show some
> aspects from my side, so from theside of developer and person who is trying
> to help others with Spark (via StackOverflow or other ways)
>
>
> Pozdrawiam / Best regards,
>
> Tomasz
>
>
> 
> Od: Cody Koeninger <c...@koeninger.org>
> Wysłane: 17 października 2016 16:46
> Do: Debasish Das
> DW: Tomasz Gawęda; dev@spark.apache.org
> Temat: Re: Spark Improvement Proposals
>
> I think narrowly focusing on Flink or benchmarks is missing my point.
>
> My point is evolve or die.  Spark's governance and organization is
> hampering its ability to evolve technologically, and it needs to
> change.
>
> On Sun, Oct 16, 2016 at 9:21 PM, Debasish Das <debasish.da...@gmail.com>
> wrote:
>> Thanks Cody for bringing up a valid point...I picked up Spark in 2014 as
>> soon as I looked into it since compared to writing Java map-reduce and
>> Cascading code, Spark made writing distributed code fun...But now as we
>> went
>> deeper with Spark and real-time streaming use-case gets more prominent, I
>> think it is time to bring a messaging model in conjunction with the
>> batch/micro-batch API that Spark is good atakka-streams close
>> integration with spark micro-batching APIs looks like a great direction to
>> stay in the game with Apache Flink...Spark 2.0 integrated streaming with
>> batch with the assumption is that micro-batching is sufficient to run SQL
>> commands on stream but do we really have time to do SQL processing at
>> streaming data within 1-2 seconds ?
>>
>> After reading the email chain, I started to look into Flink documentation
>> and if you compare it with Spark documentation, I think we have major work
>> to do detailing out Spark internals so that more people from community
>> start
>> to take active role in improving the issues so that Spark stays strong
>> compared to Flink.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Spark+Internals
>>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
>>
>> Spark is no longer an engine that works for micro-batch and batch...We
>> (and
>> I am sure many others) are pushing spark as an engine for stream and query
>> processing.we need to make it a state-of-the-art engine for high speed
>> streaming data and user queries as well !
>>
>> On Sun, Oct 16, 2016 at 1:30 PM, Tomasz Gawęda <tomasz.gaw...@outlook.com>
>> wrote:
>>>
>>> Hi everyone,
>>>
>>> I'm quite late with my answer, but I think my suggestions may help a
>>> little bit. :) Many technical and organizational topics were mentioned,
>>> but I want to focus on these negative posts about Spark and about
>>> "haters"
>>>
>>> I really like Spark. Easy of use, speed, very good community - it's
>>> everything here. But Every project has to "flight" on "framework market&qu

Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
You may know that those streams share the same keys, but Spark doesn't
unless you tell it.

mapWithState takes a StateSpec, which should allow you to specify a partitioner.

On Mon, Oct 31, 2016 at 9:40 AM, Andrii Biletskyi <andrb...@gmail.com> wrote:
> Thanks for response,
>
> So as I understand there is no way to "tell" mapWithState leave the
> partitioning schema as any other transformation would normally do.
> Then I would like to clarify if there is a simple way to do a transformation
> to a key-value stream and specify somehow the Partitioner that effectively
> would result in the same partitioning schema as the original stream.
> I.e.:
>
> stream.mapPartitions({ crs =>
>   crs.map { cr =>
> cr.key() -> cr.value()
>   }
> }) <--- specify somehow Partitioner here for the resulting rdd.
>
>
> The reason I ask is that it simply looks strange to me that Spark will have
> to shuffle each time my input stream and "state" stream during the
> mapWithState operation when I now for sure that those two streams will
> always share same keys and will not need access to others partitions.
>
> Thanks,
> Andrii
>
>
> 2016-10-31 15:45 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>> If you call a transformation on an rdd using the same partitioner as that
>> rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's no
>> consistent partitioning scheme that works for all kafka uses. You can wrap
>> each kafkardd with an rdd that has a custom partitioner that you write to
>> match your kafka partitioning scheme, and avoid a shuffle.
>>
>> The danger there is if you have any misbehaving producers, or translate
>> the partitioning wrongly, you'll get bad results. It's safer just to
>> shuffle.
>>
>>
>> On Oct 31, 2016 04:31, "Andrii Biletskyi"
>> <andrii.bilets...@yahoo.com.invalid> wrote:
>>
>> Hi all,
>>
>> I'm using Spark Streaming mapWithState operation to do a stateful
>> operation on my Kafka stream (though I think similar arguments would apply
>> for any source).
>>
>> Trying to understand a way to control mapWithState's partitioning schema.
>>
>> My transformations are simple:
>>
>> 1) create KafkaDStream
>> 2) mapPartitions to get a key-value stream where `key` corresponds to
>> Kafka message key
>> 3) apply mapWithState operation on key-value stream, the state stream
>> shares keys with the original stream, the resulting streams doesn't change
>> keys either
>>
>> The problem is that, as I understand, mapWithState stream has a different
>> partitioning schema and thus I see shuffles in Spark Web UI.
>>
>> From the mapWithState implementation I see that:
>> mapwithState uses Partitioner if specified, otherwise partitions data with
>> HashPartitioner(). The thing is that original
>> KafkaDStream has a specific partitioning schema: Kafka partitions correspond
>> Spark RDD partitions.
>>
>> Question: is there a way for mapWithState stream to inherit partitioning
>> schema from the original stream (i.e. correspond to Kafka partitions).
>>
>> Thanks,
>> Andrii
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: MapWithState partitioning

2016-10-31 Thread Cody Koeninger
If you call a transformation on an rdd using the same partitioner as that
rdd, no shuffle will occur.  KafkaRDD doesn't have a partitioner, there's
no consistent partitioning scheme that works for all kafka uses. You can
wrap each kafkardd with an rdd that has a custom partitioner that you write
to match your kafka partitioning scheme, and avoid a shuffle.

The danger there is if you have any misbehaving producers, or translate the
partitioning wrongly, you'll get bad results. It's safer just to shuffle.

On Oct 31, 2016 04:31, "Andrii Biletskyi"
 wrote:

Hi all,

I'm using Spark Streaming mapWithState operation to do a stateful operation
on my Kafka stream (though I think similar arguments would apply for any
source).

Trying to understand a way to control mapWithState's partitioning schema.

My transformations are simple:

1) create KafkaDStream
2) mapPartitions to get a key-value stream where `key` corresponds to Kafka
message key
3) apply mapWithState operation on key-value stream, the state stream
shares keys with the original stream, the resulting streams doesn't change
keys either

The problem is that, as I understand, mapWithState stream has a different
partitioning schema and thus I see shuffles in Spark Web UI.

>From the mapWithState implementation I see that:
mapwithState uses Partitioner if specified, otherwise partitions data with
HashPartitioner(). The thing is that original
KafkaDStream has a specific partitioning schema: Kafka partitions
correspond Spark RDD partitions.

Question: is there a way for mapWithState stream to inherit partitioning
schema from the original stream (i.e. correspond to Kafka partitions).

Thanks,
Andrii


Re: Reason for Kafka topic existence check / "Does the topic exist?" error

2016-10-29 Thread Cody Koeninger
I tested your claims that "it used to work that way", and was unable
to reproduce them.  As far as I can tell, streams have always failed
the very first time you start them in that situation.  As Chris and I
pointed out, there are good reasons for that.

If you don't wan't to operationalize topic creation, just start the
stream again after it fails the very first time you start it with a
new topic.  If you don't want to operationalize monitoring whether
streams actually started, especially when it fails within seconds, I
don't know what more I can say.

On Sat, Oct 29, 2016 at 8:52 AM, Dmitry Goldenberg
<dgoldenberg...@gmail.com> wrote:
> Cody,
>
> Thanks for your comments.
>
> The way I'm reading the Kafka documentation
> (https://kafka.apache.org/documentation) is that auto.create.topics.enable
> is set to true by default. Right now it's not set in our server.properties
> on the Kafka broker side so I would imagine that the first request to
> publish a document into topic X would cause X to be created, as
> auto.create.topics.enable is presumably defaulted to true.
>
> Basically, I used to be able to start a streaming Kafka job first, without
> the topic X already existing, then let the producer publish the first (and
> all subsequent) documents and the consumer would get the documents from that
> point.
>
> This mode is not working anymore. Despite auto.create.topics.enable
> presumably defaulting to true (?), I'm getting the "Does the topic exist"
> exception.
>
> Not a big problem but raises the question of, when would the topic be
> "auto-created" if not on the first document being published to it?
>
> It was nice when it was working because we didn't have to operationalize
> topic creation. Not a big deal but now we'll have to make sure we execute
> the 'create-topics' type of task or shell script at install time.
>
> This seems like a Kafka doc issue potentially, to explain what exactly one
> can expect from the auto.create.topics.enable flag.
>
> -Dmitry
>
>
> On Sat, Oct 8, 2016 at 1:26 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> So I just now retested this with 1.5.2, and 2.0.0, and the behavior is
>> exactly the same across spark versions.
>>
>> If the topic hasn't been created, you will get that error on startup,
>> because the topic doesn't exist and thus doesn't have metadata.
>>
>> If you have auto.create.topics.enable set to true on the broker
>> config, the request will fairly quickly lead to the topic being
>> created after the fact.
>>
>> All you have to do is hit up-arrow-enter and re-submit the spark job,
>> the second time around the topic will exist.  That seems pretty low
>> effort.
>>
>> I'd rather stick with having an early error for those of us that
>> prefer to run with auto.create set to false (because it makes sure the
>> topic is actually set up the way you want, reduces the likelihood of
>> spurious topics being created, etc).
>>
>>
>>
>> On Sat, Oct 8, 2016 at 11:44 AM, Dmitry Goldenberg
>> <dgoldenberg...@gmail.com> wrote:
>> > Hi,
>> >
>> > I am trying to start up a simple consumer that streams from a Kafka
>> > topic,
>> > using Spark 2.0.0:
>> >
>> > spark-streaming_2.11
>> > spark-streaming-kafka-0-8_2.11
>> >
>> > I was getting an error as below until I created the topic in Kafka. From
>> > integrating Spark 1.5, I never used to hit this check; we were able to
>> > start
>> > all of our Spark Kafka consumers, then start the producers, and have
>> > Kafka
>> > automatically create the topics once the first message for a given topic
>> > was
>> > published.
>> >
>> > Is there something I might be doing to cause this topic existence check
>> > in
>> > KafkaCluster.scala to kick in? I'd much rather be able to not have to
>> > pre-create the topics before I start the consumers.  Any
>> > thoughts/comments
>> > would be appreciated.
>> >
>> > Thanks.
>> > - Dmitry
>> >
>> > 
>> >
>> > Exception in thread "main" org.apache.spark.SparkException:
>> > java.nio.channels.ClosedChannelException
>> >
>> > java.nio.channels.ClosedChannelException
>> >
>> > org.apache.spark.SparkException: Error getting partition metadata for
>> > ''. Does the topic exist?
>> >
>> > at
>> >
>> > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$

[jira] [Commented] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module

2016-10-27 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612663#comment-15612663
 ] 

Cody Koeninger commented on SPARK-17935:


So the main thing to point out is that Kafka producers currently aren't 
idempotent, so this sink can't be fault-tolerant.

Regarding the design doc, couple of comments

- KafkaSinkRDD  Why is this necessary?  Seems like KafkaSink should do 
basically the same as existing ForeachSink class

- CachedKafkaProducer  Why is this necessary?  A singleton producer per JVM is 
generally what's recommended by kafka docs.




> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> --
>
> Key: SPARK-17935
> URL: https://issues.apache.org/jira/browse/SPARK-17935
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add 
> `KafkaForeachWriter` to output results to kafka in structured streaming 
> module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Zero Data Loss in Spark with Kafka

2016-10-26 Thread Cody Koeninger
Honestly, I would stay far away from saving offsets in Zookeeper if at
all possible. It's better to store them alongside your results.

On Wed, Oct 26, 2016 at 10:44 AM, Sunita Arvind <sunitarv...@gmail.com> wrote:
> This is enough to get it to work:
>
> df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
> SaveMode.Overwrite)
>
> And tests so far (in local env) seem good with the edits. Yet to test on the
> cluster. Cody, appreciate your thoughts on the edits.
>
> Just want to make sure I am not doing an overkill or overseeing a potential
> issue.
>
> regards
>
> Sunita
>
>
> On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>>
>> The error in the file I just shared is here:
>>
>> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
>> partition._2(0);  --> this was just partition and hence there was an error
>>
>> fetching the offset.
>>
>> Still testing. Somehow Cody, your code never lead to file already exists
>> sort of errors (I am saving the output of the dstream
>> as parquet file, after converting it to a dataframe. The batch interval
>> will be 2 hrs)
>>
>> The code in the main is here:
>>
>>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>> conf.getString("groupId"), conf.getString("topics"))
>>val storedOffsets = offsetsStore.readOffsets()
>>  LogHandler.log.info("Fetched the offset from zookeeper")
>>
>>  val kafkaArr =  storedOffsets match {
>>case None =>
>>  // start from the initial offsets
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>> kafkaProps, Set(topics))
>>
>>case Some(fromOffsets) =>
>>  // start from previously saved offsets
>>  val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
>> (String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) =>
>> (mmd.key, mmd.message)
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>> Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>>
>>  //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
>> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>>  }
>>
>>  kafkaArr.foreachRDD{ (rdd,time) =>
>>
>> val schema =
>> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
>> val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
>> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
>> }
>> val df = sql.createDataFrame(ardd,schema)
>>LogHandler.log.info("Created dataframe")
>>val offsetSaved =
>> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>>LogHandler.log.info("Saved offset to Zookeeper")
>>df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>>LogHandler.log.info("Created the parquet file")
>>  }
>>
>> Thanks
>>
>> Sunita
>>
>>
>>
>>
>>
>> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com>
>> wrote:
>>>
>>> Attached is the edited code. Am I heading in right direction? Also, I am
>>> missing something due to which, it seems to work well as long as the
>>> application is running and the files are created right. But as soon as I
>>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>>
>>> regards
>>> Sunita
>>>
>>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
>>> wrote:
>>>>
>>>> Thanks for confirming Cody.
>>>> To get to use the library, I had to do:
>>>>
>>>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>>>> "/consumers/topics/"+ topics + "/0")
>>>>
>>>> It worked well. However, I had to specify the partitionId in the zkPath.
>>>> If I want the library to pick all the partitions for a topic, without me
>>>> specifying the path, is it possible out of the box or I need to tweak?
>>>>
>>>> regards
>>>> Sunita
>>>>
>>>>
>>>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@ko

<    1   2   3   4   5   6   7   8   9   10   >