Re: FW: Kafka Direct Stream - dynamic topic subscription

2017-10-29 Thread Cody Koeninger
p that contains a little more > background on my question. > > > > Thank you, > > Regards, > > Buvana > > > > From: Ramanan, Buvana (Nokia - US/Murray Hill) > [mailto:buvana.rama...@nokia-bell-labs.com] > Sent: Friday, October 27, 2017 10:46 PM

Kafka Direct Stream - dynamic topic subscription

2017-10-27 Thread Ramanan, Buvana (Nokia - US/Murray Hill)
Hello, Using Spark 2.2.0. Interested in seeing the action of dynamic topic subscription. Tried this example: streaming.DirectKafkaWordCount (which uses org.apache.spark.streaming.kafka010) I start with 8 Kafka partitions in my topic and found that Spark Streaming executes 8 tasks (one per par

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-19 Thread HARSH TAKKAR
Thanks Cody, It worked for me buy keeping num executor with each having 1 core = num of partitions of kafka. On Mon, Sep 18, 2017 at 8:47 PM Cody Koeninger wrote: > Have you searched in jira, e.g. > > https://issues.apache.org/jira/browse/SPARK-19185 > > On Mon, Sep 18, 2017 at 1:56 AM, HARSH

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Cody Koeninger
Have you searched in jira, e.g. https://issues.apache.org/jira/browse/SPARK-19185 On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR wrote: > Hi > > Changing spark version if my last resort, is there any other workaround for > this problem. > > > On Mon, Sep 18, 2017 at 11:43 AM pandees waran wrote:

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi Changing spark version if my last resort, is there any other workaround for this problem. On Mon, Sep 18, 2017 at 11:43 AM pandees waran wrote: > All, May I know what exactly changed in 2.1.1 which solved this problem? > > Sent from my iPhone > > On Sep 17, 2017, at 11:08 PM, Anastasios Zou

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread pandees waran
All, May I know what exactly changed in 2.1.1 which solved this problem? Sent from my iPhone > On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias wrote: > > Hi, > > I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 > solved my issue. Can you try with 2.1.1 as well and repo

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread Anastasios Zouzias
Hi, I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1 solved my issue. Can you try with 2.1.1 as well and report back? Best, Anastasios Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" : Hi I am using spark 2.1.0 with scala 2.11.8, and while iterating over the partitions of e

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread kant kodali
You should paste some code. ConcurrentModificationException normally happens when you modify a list or any non-thread safe data structure while you are iterating over it. On Sun, Sep 17, 2017 at 10:25 PM, HARSH TAKKAR wrote: > Hi, > > No we are not creating any thread for kafka DStream > however

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi, No we are not creating any thread for kafka DStream however, we have a single thread for refreshing a resource cache on driver, but that is totally separate to this connection. On Mon, Sep 18, 2017 at 12:29 AM kant kodali wrote: > Are you creating threads in your application? > > On Sun, Se

Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread kant kodali
Are you creating threads in your application? On Sun, Sep 17, 2017 at 7:48 AM, HARSH TAKKAR wrote: > > Hi > > I am using spark 2.1.0 with scala 2.11.8, and while iterating over the > partitions of each rdd in a dStream formed using KafkaUtils, i am getting > the below exception, please suggest

ConcurrentModificationException using Kafka Direct Stream

2017-09-17 Thread HARSH TAKKAR
Hi I am using spark 2.1.0 with scala 2.11.8, and while iterating over the partitions of each rdd in a dStream formed using KafkaUtils, i am getting the below exception, please suggest a fix. I have following config kafka : enable.auto.commit:"true", auto.commit.interval.ms:"1000", session.timeo

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
ing >> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> >> instead. >> >> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan >> wrote: >> >>> I am using Spark Streaming Checkpoint and Kafka Direct Stream. >>> It

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread ALunar Beach
ommend > learning Structured Streaming > <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> > instead. > > On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan > wrote: > >> I am using Spark Streaming Checkpoint and Kafka Direct Stream.

Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
:16 AM, anbucheeralan wrote: > I am using Spark Streaming Checkpoint and Kafka Direct Stream. > It uses a 30 sec batch duration and normally the job is successful in > 15-20 sec. > > If the spark application fails after the successful completion > (149668428ms in the log b

Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread anbucheeralan
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch

Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread ALunar Beach
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch

Re: Spark 2 Kafka Direct Stream Consumer Issue

2017-05-24 Thread Jayadeep J
Could any of the experts kindly advise ? On Fri, May 19, 2017 at 6:00 PM, Jayadeep J wrote: > Hi , > > I would appreciate some advice regarding an issue we are facing in > Streaming Kafka Direct Consumer. > > We have recently upgraded our application with Kafka Direct Stream t

Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
DStream checkpoints have all kinds of other difficulties, biggest one being you can't use a checkpoint if your app code has been updated. If you can avoid checkpoints in general, I would. On Fri, Oct 21, 2016 at 11:17 AM, Erwan ALLAIN wrote: > Thanks for the fast answer ! > > I just feel annoyed

Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Thanks for the fast answer ! I just feel annoyed and frustrated not to be able to use spark checkpointing because I believe that there mechanism has been correctly tested. I'm afraid that reinventing the wheel can lead to side effects that I don't see now ... Anyway thanks again, I know what I ha

Re: Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Cody Koeninger
0. If your processing time is regularly greater than your batch interval you're going to have problems anyway. Investigate this more, set maxRatePerPartition, something. 1. That's personally what I tend to do. 2. Why are you relying on checkpoints if you're storing offset state in the database?

Kafka Direct Stream: Offset Managed Manually (Exactly Once)

2016-10-21 Thread Erwan ALLAIN
Hi, I'm currently implementing an exactly once mechanism based on the following example: https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala the pseudo code is as follow: dstream.transform (store offset in a variable on driver side ) ds

Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Cody Koeninger
Yeah, to be clear, I'm talking about having only one constructor for a direct stream, that will give you a stream of ConsumerRecord. Different needs for topic subscription, starting offsets, etc could be handled by calling appropriate methods after construction but before starting the stream. On

Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Alan Braithwaite
I'd probably prefer to keep it the way it is, unless it's becoming more like the function without the messageHandler argument. Right now I have code like this, but I wish it were more similar looking: if (parsed.partitions.isEmpty()) { JavaPairInputDStream kvstream = KafkaUtils

Re: Use cases for kafka direct stream messageHandler

2016-03-08 Thread Cody Koeninger
No, looks like you'd have to catch them in the serializer and have the serializer return option or something. The new consumer builds a buffer full of records, not one at a time. On Mar 8, 2016 4:43 AM, "Marius Soutier" wrote: > > > On 04.03.2016, at 22:39, Cody Koeninger wrote: > > > > The only

Re: Use cases for kafka direct stream messageHandler

2016-03-08 Thread Marius Soutier
> On 04.03.2016, at 22:39, Cody Koeninger wrote: > > The only other valid use of messageHandler that I can think of is > catching serialization problems on a per-message basis. But with the > new Kafka consumer library, that doesn't seem feasible anyway, and > could be handled with a custom (de

Use cases for kafka direct stream messageHandler

2016-03-04 Thread Cody Koeninger
Wanted to survey what people are using the direct stream messageHandler for, besides just extracting key / value / offset. Would your use case still work if that argument was removed, and the stream just contained ConsumerRecord objects (http://kafka.apache.org/090/javadoc/org/apache/kafka/clients

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot. 发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月27日 上午1:02写道:Yes.On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn wrote:Thanks a lot. And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to cons

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread Cody Koeninger
Yes. On Thu, Feb 25, 2016 at 9:45 PM, yuhang.chenn wrote: > Thanks a lot. > And I got another question: What would happen if I didn't set > "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to > consume all the messages in Kafka? > > 发自WPS邮箱客戶端 > 在 Cody Koeninger ,2016年2月25日

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-26 Thread yuhang.chenn
Thanks a lot. And I got another question: What would happen if I didn't set "spark.streaming.kafka.maxRatePerPartition"? Will Spark Streamning try to consume all the messages in Kafka? 发自WPS邮箱客戶端在 Cody Koeninger ,2016年2月25日 上午11:58写道:The per partition offsets are part of the rdd as defined on the

Re: How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Cody Koeninger
The per partition offsets are part of the rdd as defined on the driver. Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md and/or watched https://www.youtube.com/watch?v=fXnNEq1v3VA On Wed, Feb 24, 2016 at 9:05 PM, Yuhang Chen wrote: > Hi, as far as I know,

How does Spark streaming's Kafka direct stream survive from worker node failure?

2016-02-24 Thread Yuhang Chen
Hi, as far as I know, there is a 1:1 mapping between Spark partition and Kafka partition, and in Spark's fault-tolerance mechanism, if a partition failed, another partition will be used to recompute those data. And my questions are below: When a partition (worker node) fails in Spark Streaming, 1.

Re: Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Cody Koeninger
r contributing the kafka > direct stream. > > I have been using the receiver based approach for months but the > direct stream is a much better solution for my use case. > > The job in question is now ported over to the direct stream doing > idempotent outputs to Cassandra an

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-21 Thread Conor Fennell
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs

Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-21 Thread Conor Fennell
Hi, Firstly want to say a big thanks to Cody for contributing the kafka direct stream. I have been using the receiver based approach for months but the direct stream is a much better solution for my use case. The job in question is now ported over to the direct stream doing idempotent outputs

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks! Indeed not a given. I'm not sure we have the time to wait for nodes within a streaming interval. I'll explore some alternatives. If I stumble on something reasonable I'll report back. -kr, Gerard. On Wed, Oct 14, 2015 at 9:57 PM, Cody Koeninger wrote: > What I'm saying is that it's no

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
What I'm saying is that it's not a given with spark, even in receiver-based mode, because as soon as you lose an executor you'll have a rebalance. Spark's model in general isn't a good fit for pinning work to specific nodes. If you really want to try and fake this, you can override getPreferredLo

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody, I think that I misused the term 'data locality'. I think I should better call it "node affinity" instead, as this is what I would like to have: For as long as an executor is available, I would like to have the same kafka partition processed by the same node in order to take advantage of

RE: Node afinity for Kafka-Direct Stream

2015-10-14 Thread prajod.vettiyattil
] Sent: 14 October 2015 18:53 To: Saisai Shao Cc: Rishitesh Mishra ; spark users Subject: Re: Node afinity for Kafka-Direct Stream Thanks Saisai, Mishra, Indeed, that hint will only work on a case where the Spark executor is co-located with the Kafka broker. I think the answer to my question as

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
Assumptions about locality in spark are not very reliable, regardless of what consumer you use. Even if you have locality preferences, and locality wait turned up really high, you still have to account for losing executors. On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas wrote: > Thanks Saisai, Mi

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra, Indeed, that hint will only work on a case where the Spark executor is co-located with the Kafka broker. I think the answer to my question as stated is that there's no warranty of where the task will execute as it will depend on the scheduler and cluster resources available

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
This preferred locality is a hint to spark to schedule Kafka tasks on the preferred nodes, if Kafka and Spark are two separate cluster, obviously this locality hint takes no effect, and spark will schedule tasks following node-local -> rack-local -> any pattern, like any other spark tasks. On Wed,

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
You could check the code of KafkaRDD, the locality (host) is got from Kafka's partition and set in KafkaRDD, this will a hint for Spark to schedule task on the preferred location. override def getPreferredLocations(thePart: Partition): Seq[String] = { val part = thePart.asInstanceOf[KafkaRDDPart

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Rishitesh Mishra
Hi Gerard, I am also trying to understand the same issue. Whatever code I have seen it looks like once Kafka RDD is constructed the execution of that RDD is upto the task scheduler and it can schedule the partitions based on the load on nodes. There is preferred node specified in Kafks RDD. But ASF

Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
In the receiver-based kafka streaming model, given that each receiver starts as a long-running task, one can rely in a certain degree of data locality based on the kafka partitioning: Data published on a given topic/partition will land on the same spark streaming receiving node until the receiver

Re: Kafka Direct Stream

2015-10-04 Thread varun sharma
sandra for storing it. >>>> >>>> Please help in how to transform iterable to DStream or any other >>>> workaround for achieving same. >>>> >>>> >>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase >>>> wrote: >>>>

Re: Kafka Direct Stream

2015-10-03 Thread Gerard Maas
or manually emitting a tuple) and use one of the .xxxByKey >>>> operators for the processing. >>>> >>>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>>> topics) and the processing is *really* different, I would also look at >

Re: Kafka Direct Stream

2015-10-03 Thread varun sharma
; operators for the processing. >>> >>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>> topics) and the processing is *really* different, I would also look at >>> filtering by topic and saving as different Dstreams in your code. >>&g

Re: Kafka Direct Stream

2015-10-02 Thread Gerard Maas
code. >> >> Either way you need to start with Cody’s tip in order to extract the >> topic name. >> >> -adrian >> >> From: Cody Koeninger >> Date: Thursday, October 1, 2015 at 5:06 PM >> To: Udit Mehta >> Cc: user >> Subject: Re: Kaf

Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
erate N different kafka > direct streams ? when creating a kafka direct stream you have list of > topics - just give one. > > > Then the reusable part of your computations should be extractable as > transformations/functions and reused between the streams. > > > Nicu >

Re: Kafka Direct Stream

2015-10-02 Thread varun sharma
ursday, October 1, 2015 at 5:06 PM > To: Udit Mehta > Cc: user > Subject: Re: Kafka Direct Stream > > You can get the topic for a given partition from the offset range. You > can either filter using that; or just have a single rdd and match on topic > when doing mapPart

Re: Kafka Direct Stream

2015-10-01 Thread Nicolae Marasoiu
Hi, If you just need processing per topic, why not generate N different kafka direct streams ? when creating a kafka direct stream you have list of topics - just give one. Then the reusable part of your computations should be extractable as transformations/functions and reused between the

Re: Kafka Direct Stream

2015-10-01 Thread Adrian Tanase
would also look at filtering by topic and saving as different Dstreams in your code. Either way you need to start with Cody’s tip in order to extract the topic name. -adrian From: Cody Koeninger Date: Thursday, October 1, 2015 at 5:06 PM To: Udit Mehta Cc: user Subject: Re: Kafka Direct Stream You

Re: Kafka Direct Stream

2015-10-01 Thread Cody Koeninger
You can get the topic for a given partition from the offset range. You can either filter using that; or just have a single rdd and match on topic when doing mapPartitions or foreachPartition (which I think is a better idea) http://spark.apache.org/docs/latest/streaming-kafka-integration.html#appr

Kafka Direct Stream

2015-09-30 Thread Udit Mehta
Hi, I am using spark direct stream to consume from multiple topics in Kafka. I am able to consume fine but I am stuck at how to separate the data for each topic since I need to process data differently depending on the topic. I basically want to split the RDD consisting on N topics into N RDD's ea

Re: [streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Cody Koeninger
Offset out of range means the message in question is no longer available on Kafka. What's your kafka log retention set to, and how does that compare to your processing time? On Wed, Sep 30, 2015 at 4:26 AM, Alexey Ponkin wrote: > Hi > > I have simple spark-streaming job(8 executors 1 core - on

[streaming] reading Kafka direct stream throws kafka.common.OffsetOutOfRangeException

2015-09-30 Thread Alexey Ponkin
Hi I have simple spark-streaming job(8 executors 1 core - on 8 node cluster) - read from Kafka topic( 3 brokers with 8 partitions) and save to Cassandra. The problem is that when I increase number of incoming messages in topic the job is starting to fail with kafka.common.OffsetOutOfRangeExcepti

Re: Kafka Direct Stream join without data shuffle

2015-09-02 Thread Cody Koeninger
No, there isn't a partitioner for KafkaRDD (KafkaRDD may not even be a pair rdd, for instance). It sounds to me like if it's a self-join, you should be able to do it in a single mapPartition operation. On Wed, Sep 2, 2015 at 3:06 PM, Chen Song wrote: > I have a stream got from Kafka with direct

Kafka Direct Stream join without data shuffle

2015-09-02 Thread Chen Song
I have a stream got from Kafka with direct approach, say, inputStream, I need to 1. Create another DStream derivedStream with map or mapPartitions (with some data enrichment with reference table) on inputStream 2. Join derivedStream with inputStream In my use case, I don't need to shuffle data. E

Re: Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Apoorva Sareen
; wrote: > Hi, > > I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with > java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala > 2.11 support. > > The issue I am seeing is that both driver and executor containers are >

Re: Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Cody Koeninger
45 and also Kafka direct stream. I am also using spark with > scala 2.11 support. > > The issue I am seeing is that both driver and executor containers are > gradually increasing the physical memory usage till a point where yarn > container kill it. I have configured upto 192M Heap and 3

Spark off heap memory leak on Yarn with Kafka direct stream

2015-07-13 Thread Apoorva Sareen
Hi, I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support. The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till

Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Benjamin Fradet
aDirectKafkaWordCount.java> >> for you to start.​ >> >> >> >> Thanks >> Best Regards >> >> On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni >> wrote: >> >>> Hi , >>> >>> If i have a below data format

Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Ashish Soni
main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java> > for you to start.​ > > > > Thanks > Best Regards > > On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni > wrote: > >> Hi , >> >> If i have a below data format , how can i use k

Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Akhil Das
mples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java> for you to start.​ Thanks Best Regards On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni wrote: > Hi , > > If i have a below data format , how can i use kafka direct stream to > de-serialize as i am no

Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Ashish Soni
Hi , If i have a below data format , how can i use kafka direct stream to de-serialize as i am not able to understand all the parameter i need to pass , Can some one explain what will be the arguments as i am not clear about this JavaPairInputDStream , V > org .apache .spark .stream