Re: Rebalancing when adding kafka partitions
The underlying kafka consumer On Tue, Aug 16, 2016 at 2:17 PM, Srikanth wrote: > Yes, SubscribePattern detects new partition. Also, it has a comment saying > >> Subscribe to all topics matching specified pattern to get dynamically >> assigned partitions. >> * The pattern matching will be done periodically against topics existing >> at the time of check. >> * @param pattern pattern to subscribe to >> * @param kafkaParams Kafka > > > Who does the new partition discover? Underlying kafka consumer or > spark-streaming-kafka-0-10-assembly?? > > Srikanth > > On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger wrote: >> >> Hrrm, that's interesting. Did you try with subscribe pattern, out of >> curiosity? >> >> I haven't tested repartitioning on the underlying new Kafka consumer, so >> its possible I misunderstood something. >> >> On Aug 12, 2016 2:47 PM, "Srikanth" wrote: >>> >>> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly. >>> Partition was increased using "bin/kafka-topics.sh --alter" after spark >>> job was started. >>> I don't see messages from new partitions in the DStream. >>> KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics, kafkaParams) ) .map(r => (r.key(), r.value())) >>> >>> >>> Also, no.of partitions did not increase too. dataStream.foreachRDD( (rdd, curTime) => { logger.info(s"rdd has ${rdd.getNumPartitions} partitions.") >>> >>> >>> Should I be setting some parameter/config? Is the doc for new integ >>> available? >>> >>> Thanks, >>> Srikanth >>> >>> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger >>> wrote: No, restarting from a checkpoint won't do it, you need to re-define the stream. Here's the jira for the 0.10 integration https://issues.apache.org/jira/browse/SPARK-12177 I haven't gotten docs completed yet, but there are examples at https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 On Fri, Jul 22, 2016 at 1:05 PM, Srikanth wrote: > In Spark 1.x, if we restart from a checkpoint, will it read from new > partitions? > > If you can, pls point us to some doc/link that talks about Kafka 0.10 > integ > in Spark 2.0. > > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger > wrote: >> >> For the integration for kafka 0.8, you are literally starting a >> streaming job against a fixed set of topicapartitions, It will not >> change throughout the job, so you'll need to restart the spark job if >> you change kafka partitions. >> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe >> or subscribepattern, it should pick up new partitions as they are >> added. >> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth >> wrote: >> > Hello, >> > >> > I'd like to understand how Spark Streaming(direct) would handle >> > Kafka >> > partition addition? >> > Will a running job be aware of new partitions and read from it? >> > Since it uses Kafka APIs to query offsets and offsets are handled >> > internally. >> > >> > Srikanth > > >>> >>> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Rebalancing when adding kafka partitions
Yes, SubscribePattern detects new partition. Also, it has a comment saying Subscribe to all topics matching specified pattern to get dynamically > assigned partitions. > * The pattern matching will be done periodically against topics existing > at the time of check. > * @param pattern pattern to subscribe to > * @param kafkaParams Kafka Who does the new partition discover? Underlying kafka consumer or spark-streaming-kafka-0-10-assembly?? Srikanth On Fri, Aug 12, 2016 at 5:15 PM, Cody Koeninger wrote: > Hrrm, that's interesting. Did you try with subscribe pattern, out of > curiosity? > > I haven't tested repartitioning on the underlying new Kafka consumer, so > its possible I misunderstood something. > On Aug 12, 2016 2:47 PM, "Srikanth" wrote: > >> I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly. >> Partition was increased using "bin/kafka-topics.sh --alter" after spark >> job was started. >> I don't see messages from new partitions in the DStream. >> >> KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( >>> ssc, PreferConsistent, Subscribe[Array[Byte], >>> Array[Byte]](topics, kafkaParams) ) >>> .map(r => (r.key(), r.value())) >> >> >> Also, no.of partitions did not increase too. >> >>> dataStream.foreachRDD( (rdd, curTime) => { >>> logger.info(s"rdd has ${rdd.getNumPartitions} partitions.") >> >> >> Should I be setting some parameter/config? Is the doc for new integ >> available? >> >> Thanks, >> Srikanth >> >> On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger >> wrote: >> >>> No, restarting from a checkpoint won't do it, you need to re-define the >>> stream. >>> >>> Here's the jira for the 0.10 integration >>> >>> https://issues.apache.org/jira/browse/SPARK-12177 >>> >>> I haven't gotten docs completed yet, but there are examples at >>> >>> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 >>> >>> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth wrote: >>> > In Spark 1.x, if we restart from a checkpoint, will it read from new >>> > partitions? >>> > >>> > If you can, pls point us to some doc/link that talks about Kafka 0.10 >>> integ >>> > in Spark 2.0. >>> > >>> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger >>> wrote: >>> >> >>> >> For the integration for kafka 0.8, you are literally starting a >>> >> streaming job against a fixed set of topicapartitions, It will not >>> >> change throughout the job, so you'll need to restart the spark job if >>> >> you change kafka partitions. >>> >> >>> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe >>> >> or subscribepattern, it should pick up new partitions as they are >>> >> added. >>> >> >>> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth >>> wrote: >>> >> > Hello, >>> >> > >>> >> > I'd like to understand how Spark Streaming(direct) would handle >>> Kafka >>> >> > partition addition? >>> >> > Will a running job be aware of new partitions and read from it? >>> >> > Since it uses Kafka APIs to query offsets and offsets are handled >>> >> > internally. >>> >> > >>> >> > Srikanth >>> > >>> > >>> >> >>
Re: Rebalancing when adding kafka partitions
Hrrm, that's interesting. Did you try with subscribe pattern, out of curiosity? I haven't tested repartitioning on the underlying new Kafka consumer, so its possible I misunderstood something. On Aug 12, 2016 2:47 PM, "Srikanth" wrote: > I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly. > Partition was increased using "bin/kafka-topics.sh --alter" after spark > job was started. > I don't see messages from new partitions in the DStream. > > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( >> ssc, PreferConsistent, Subscribe[Array[Byte], >> Array[Byte]](topics, kafkaParams) ) >> .map(r => (r.key(), r.value())) > > > Also, no.of partitions did not increase too. > >> dataStream.foreachRDD( (rdd, curTime) => { >> logger.info(s"rdd has ${rdd.getNumPartitions} partitions.") > > > Should I be setting some parameter/config? Is the doc for new integ > available? > > Thanks, > Srikanth > > On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger > wrote: > >> No, restarting from a checkpoint won't do it, you need to re-define the >> stream. >> >> Here's the jira for the 0.10 integration >> >> https://issues.apache.org/jira/browse/SPARK-12177 >> >> I haven't gotten docs completed yet, but there are examples at >> >> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 >> >> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth wrote: >> > In Spark 1.x, if we restart from a checkpoint, will it read from new >> > partitions? >> > >> > If you can, pls point us to some doc/link that talks about Kafka 0.10 >> integ >> > in Spark 2.0. >> > >> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger >> wrote: >> >> >> >> For the integration for kafka 0.8, you are literally starting a >> >> streaming job against a fixed set of topicapartitions, It will not >> >> change throughout the job, so you'll need to restart the spark job if >> >> you change kafka partitions. >> >> >> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe >> >> or subscribepattern, it should pick up new partitions as they are >> >> added. >> >> >> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth >> wrote: >> >> > Hello, >> >> > >> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka >> >> > partition addition? >> >> > Will a running job be aware of new partitions and read from it? >> >> > Since it uses Kafka APIs to query offsets and offsets are handled >> >> > internally. >> >> > >> >> > Srikanth >> > >> > >> > >
Re: Rebalancing when adding kafka partitions
I did try a test with spark 2.0 + spark-streaming-kafka-0-10-assembly. Partition was increased using "bin/kafka-topics.sh --alter" after spark job was started. I don't see messages from new partitions in the DStream. KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] ( > ssc, PreferConsistent, Subscribe[Array[Byte], Array[Byte]](topics, > kafkaParams) ) > .map(r => (r.key(), r.value())) Also, no.of partitions did not increase too. > dataStream.foreachRDD( (rdd, curTime) => { > logger.info(s"rdd has ${rdd.getNumPartitions} partitions.") Should I be setting some parameter/config? Is the doc for new integ available? Thanks, Srikanth On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger wrote: > No, restarting from a checkpoint won't do it, you need to re-define the > stream. > > Here's the jira for the 0.10 integration > > https://issues.apache.org/jira/browse/SPARK-12177 > > I haven't gotten docs completed yet, but there are examples at > > https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 > > On Fri, Jul 22, 2016 at 1:05 PM, Srikanth wrote: > > In Spark 1.x, if we restart from a checkpoint, will it read from new > > partitions? > > > > If you can, pls point us to some doc/link that talks about Kafka 0.10 > integ > > in Spark 2.0. > > > > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger > wrote: > >> > >> For the integration for kafka 0.8, you are literally starting a > >> streaming job against a fixed set of topicapartitions, It will not > >> change throughout the job, so you'll need to restart the spark job if > >> you change kafka partitions. > >> > >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe > >> or subscribepattern, it should pick up new partitions as they are > >> added. > >> > >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth > wrote: > >> > Hello, > >> > > >> > I'd like to understand how Spark Streaming(direct) would handle Kafka > >> > partition addition? > >> > Will a running job be aware of new partitions and read from it? > >> > Since it uses Kafka APIs to query offsets and offsets are handled > >> > internally. > >> > > >> > Srikanth > > > > >
Re: Rebalancing when adding kafka partitions
Scaladoc is already in the code, just not the html docs On Fri, Jul 22, 2016 at 1:46 PM, Srikanth wrote: > Yeah, that's what I thought. We need to redefine not just restart. > Thanks for the info! > > I do see the usage of subscribe[K,V] in your DStreams example. > Looks simple but its not very obvious how it works :-) > I'll watch out for the docs and ScalaDoc. > > Srikanth > > On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger wrote: >> >> No, restarting from a checkpoint won't do it, you need to re-define the >> stream. >> >> Here's the jira for the 0.10 integration >> >> https://issues.apache.org/jira/browse/SPARK-12177 >> >> I haven't gotten docs completed yet, but there are examples at >> >> https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 >> >> On Fri, Jul 22, 2016 at 1:05 PM, Srikanth wrote: >> > In Spark 1.x, if we restart from a checkpoint, will it read from new >> > partitions? >> > >> > If you can, pls point us to some doc/link that talks about Kafka 0.10 >> > integ >> > in Spark 2.0. >> > >> > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger >> > wrote: >> >> >> >> For the integration for kafka 0.8, you are literally starting a >> >> streaming job against a fixed set of topicapartitions, It will not >> >> change throughout the job, so you'll need to restart the spark job if >> >> you change kafka partitions. >> >> >> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe >> >> or subscribepattern, it should pick up new partitions as they are >> >> added. >> >> >> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth >> >> wrote: >> >> > Hello, >> >> > >> >> > I'd like to understand how Spark Streaming(direct) would handle Kafka >> >> > partition addition? >> >> > Will a running job be aware of new partitions and read from it? >> >> > Since it uses Kafka APIs to query offsets and offsets are handled >> >> > internally. >> >> > >> >> > Srikanth >> > >> > > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Rebalancing when adding kafka partitions
Yeah, that's what I thought. We need to redefine not just restart. Thanks for the info! I do see the usage of subscribe[K,V] in your DStreams example. Looks simple but its not very obvious how it works :-) I'll watch out for the docs and ScalaDoc. Srikanth On Fri, Jul 22, 2016 at 2:15 PM, Cody Koeninger wrote: > No, restarting from a checkpoint won't do it, you need to re-define the > stream. > > Here's the jira for the 0.10 integration > > https://issues.apache.org/jira/browse/SPARK-12177 > > I haven't gotten docs completed yet, but there are examples at > > https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 > > On Fri, Jul 22, 2016 at 1:05 PM, Srikanth wrote: > > In Spark 1.x, if we restart from a checkpoint, will it read from new > > partitions? > > > > If you can, pls point us to some doc/link that talks about Kafka 0.10 > integ > > in Spark 2.0. > > > > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger > wrote: > >> > >> For the integration for kafka 0.8, you are literally starting a > >> streaming job against a fixed set of topicapartitions, It will not > >> change throughout the job, so you'll need to restart the spark job if > >> you change kafka partitions. > >> > >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe > >> or subscribepattern, it should pick up new partitions as they are > >> added. > >> > >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth > wrote: > >> > Hello, > >> > > >> > I'd like to understand how Spark Streaming(direct) would handle Kafka > >> > partition addition? > >> > Will a running job be aware of new partitions and read from it? > >> > Since it uses Kafka APIs to query offsets and offsets are handled > >> > internally. > >> > > >> > Srikanth > > > > >
Re: Rebalancing when adding kafka partitions
No, restarting from a checkpoint won't do it, you need to re-define the stream. Here's the jira for the 0.10 integration https://issues.apache.org/jira/browse/SPARK-12177 I haven't gotten docs completed yet, but there are examples at https://github.com/koeninger/kafka-exactly-once/tree/kafka-0.10 On Fri, Jul 22, 2016 at 1:05 PM, Srikanth wrote: > In Spark 1.x, if we restart from a checkpoint, will it read from new > partitions? > > If you can, pls point us to some doc/link that talks about Kafka 0.10 integ > in Spark 2.0. > > On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger wrote: >> >> For the integration for kafka 0.8, you are literally starting a >> streaming job against a fixed set of topicapartitions, It will not >> change throughout the job, so you'll need to restart the spark job if >> you change kafka partitions. >> >> For the integration for kafka 0.10 / spark 2.0, if you use subscribe >> or subscribepattern, it should pick up new partitions as they are >> added. >> >> On Fri, Jul 22, 2016 at 11:29 AM, Srikanth wrote: >> > Hello, >> > >> > I'd like to understand how Spark Streaming(direct) would handle Kafka >> > partition addition? >> > Will a running job be aware of new partitions and read from it? >> > Since it uses Kafka APIs to query offsets and offsets are handled >> > internally. >> > >> > Srikanth > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Rebalancing when adding kafka partitions
In Spark 1.x, if we restart from a checkpoint, will it read from new partitions? If you can, pls point us to some doc/link that talks about Kafka 0.10 integ in Spark 2.0. On Fri, Jul 22, 2016 at 1:33 PM, Cody Koeninger wrote: > For the integration for kafka 0.8, you are literally starting a > streaming job against a fixed set of topicapartitions, It will not > change throughout the job, so you'll need to restart the spark job if > you change kafka partitions. > > For the integration for kafka 0.10 / spark 2.0, if you use subscribe > or subscribepattern, it should pick up new partitions as they are > added. > > On Fri, Jul 22, 2016 at 11:29 AM, Srikanth wrote: > > Hello, > > > > I'd like to understand how Spark Streaming(direct) would handle Kafka > > partition addition? > > Will a running job be aware of new partitions and read from it? > > Since it uses Kafka APIs to query offsets and offsets are handled > > internally. > > > > Srikanth >
Re: Rebalancing when adding kafka partitions
For the integration for kafka 0.8, you are literally starting a streaming job against a fixed set of topicapartitions, It will not change throughout the job, so you'll need to restart the spark job if you change kafka partitions. For the integration for kafka 0.10 / spark 2.0, if you use subscribe or subscribepattern, it should pick up new partitions as they are added. On Fri, Jul 22, 2016 at 11:29 AM, Srikanth wrote: > Hello, > > I'd like to understand how Spark Streaming(direct) would handle Kafka > partition addition? > Will a running job be aware of new partitions and read from it? > Since it uses Kafka APIs to query offsets and offsets are handled > internally. > > Srikanth - To unsubscribe e-mail: user-unsubscr...@spark.apache.org