Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
Python version has been available since 1.4. It should be close to feature parity with the jvm version in 1.5 On Tue, Aug 18, 2015 at 9:36 AM, ayan guha wrote: > Hi Cody > > A non-related question. Any idea when Python-version of direct receiver is > expected? Me personally looking forward to it :) > > On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger > wrote: > >> The solution you found is also in the docs: >> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html >> >> Java uses an atomic reference because Java doesn't allow you to close >> over non-final references. >> >> I'm not clear on your other question. >> >> On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak wrote: >> >>> The solution how to share offsetRanges after DirectKafkaInputStream is >>> transformed is in: >>> >>> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala >>> >>> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java >>> >>> One thing I would like to understand is why Scala version is using >>> normal variable while Java version uses AtomicReference. >>> >>> Another thing which I don't get is about closure serialization. The >>> question why logger in the below code doesn't throw NPE even its instance >>> isn't copied like in the case of offsetRanges, when val offsets = >>> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws >>> on offsets(idx). I have something like this code: >>> >>> object StreamOps { >>> >>> val logger = LoggerFactory.getLogger("StreamOps") >>> var offsetRanges = Array[OffsetRange]() >>> >>> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { >>> stream transform { rdd => >>> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>> >>> rdd flatmap { message => >>> Try(... decode Array[Byte] to F...) match { >>> case Success(fact) => Some(fact) >>> case _ => None >>> } >>> } >>> } >>> >>> // Error handling removed for brevity >>> def save[F](stream: DStream[F]): Unit { >>> stream foreachRDD { rdd => >>> // It has to be here otherwise NullPointerException >>> val offsets = offsetRanges >>> >>> rdd mapartitionWithIndex { (idx, facts) => >>> // Use offsets here >>> val writer = new MyWriter[F](offsets(idx), ...) >>> >>> facts foreach { fact => >>> writer.write(fact) >>> } >>> >>> writer.close() >>> >>> // Why logger works and doesn't throw NullPointerException? >>> logger.info(...) >>> >>> Iterator.empty >>> } foreach { >>> (_: Nothing) => >>> } >>> } >>> } >>> >>> Many thanks for any advice, I'm sure its a noob question. >>> Petr >>> >>> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak >>> wrote: >>> Or can I generally create new RDD from transformation and enrich its partitions with some metadata so that I would copy OffsetRanges in my new RDD in DStream? On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak wrote: > Hi all, > I need to transform KafkaRDD into a new stream of deserialized case > classes. I want to use the new stream to save it to file and to perform > additional transformations on it. > > To save it I want to use offsets in filenames, hence I need > OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't > know how to do it. > > Alternatively I could deserialize directly in messageHandler before > KafkaRDD but it seems it is 1:1 transformation while I need to drop bad > messages (KafkaRDD => RDD it would be flatMap). > > Is there a way how to do it using messageHandler, is there another > approach? > > Many thanks for any help. > Petr > >>> >> > > > -- > Best Regards, > Ayan Guha >
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
Hi Cody A non-related question. Any idea when Python-version of direct receiver is expected? Me personally looking forward to it :) On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger wrote: > The solution you found is also in the docs: > > http://spark.apache.org/docs/latest/streaming-kafka-integration.html > > Java uses an atomic reference because Java doesn't allow you to close over > non-final references. > > I'm not clear on your other question. > > On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak wrote: > >> The solution how to share offsetRanges after DirectKafkaInputStream is >> transformed is in: >> >> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala >> >> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java >> >> One thing I would like to understand is why Scala version is using normal >> variable while Java version uses AtomicReference. >> >> Another thing which I don't get is about closure serialization. The >> question why logger in the below code doesn't throw NPE even its instance >> isn't copied like in the case of offsetRanges, when val offsets = >> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws >> on offsets(idx). I have something like this code: >> >> object StreamOps { >> >> val logger = LoggerFactory.getLogger("StreamOps") >> var offsetRanges = Array[OffsetRange]() >> >> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { >> stream transform { rdd => >> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> >> rdd flatmap { message => >> Try(... decode Array[Byte] to F...) match { >> case Success(fact) => Some(fact) >> case _ => None >> } >> } >> } >> >> // Error handling removed for brevity >> def save[F](stream: DStream[F]): Unit { >> stream foreachRDD { rdd => >> // It has to be here otherwise NullPointerException >> val offsets = offsetRanges >> >> rdd mapartitionWithIndex { (idx, facts) => >> // Use offsets here >> val writer = new MyWriter[F](offsets(idx), ...) >> >> facts foreach { fact => >> writer.write(fact) >> } >> >> writer.close() >> >> // Why logger works and doesn't throw NullPointerException? >> logger.info(...) >> >> Iterator.empty >> } foreach { >> (_: Nothing) => >> } >> } >> } >> >> Many thanks for any advice, I'm sure its a noob question. >> Petr >> >> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak wrote: >> >>> Or can I generally create new RDD from transformation and enrich its >>> partitions with some metadata so that I would copy OffsetRanges in my new >>> RDD in DStream? >>> >>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak >>> wrote: >>> Hi all, I need to transform KafkaRDD into a new stream of deserialized case classes. I want to use the new stream to save it to file and to perform additional transformations on it. To save it I want to use offsets in filenames, hence I need OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't know how to do it. Alternatively I could deserialize directly in messageHandler before KafkaRDD but it seems it is 1:1 transformation while I need to drop bad messages (KafkaRDD => RDD it would be flatMap). Is there a way how to do it using messageHandler, is there another approach? Many thanks for any help. Petr >>> >>> >> > -- Best Regards, Ayan Guha
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
The solution you found is also in the docs: http://spark.apache.org/docs/latest/streaming-kafka-integration.html Java uses an atomic reference because Java doesn't allow you to close over non-final references. I'm not clear on your other question. On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak wrote: > The solution how to share offsetRanges after DirectKafkaInputStream is > transformed is in: > > https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala > > https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java > > One thing I would like to understand is why Scala version is using normal > variable while Java version uses AtomicReference. > > Another thing which I don't get is about closure serialization. The > question why logger in the below code doesn't throw NPE even its instance > isn't copied like in the case of offsetRanges, when val offsets = > offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws > on offsets(idx). I have something like this code: > > object StreamOps { > > val logger = LoggerFactory.getLogger("StreamOps") > var offsetRanges = Array[OffsetRange]() > > def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { > stream transform { rdd => > offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > > rdd flatmap { message => > Try(... decode Array[Byte] to F...) match { > case Success(fact) => Some(fact) > case _ => None > } > } > } > > // Error handling removed for brevity > def save[F](stream: DStream[F]): Unit { > stream foreachRDD { rdd => > // It has to be here otherwise NullPointerException > val offsets = offsetRanges > > rdd mapartitionWithIndex { (idx, facts) => > // Use offsets here > val writer = new MyWriter[F](offsets(idx), ...) > > facts foreach { fact => > writer.write(fact) > } > > writer.close() > > // Why logger works and doesn't throw NullPointerException? > logger.info(...) > > Iterator.empty > } foreach { > (_: Nothing) => > } > } > } > > Many thanks for any advice, I'm sure its a noob question. > Petr > > On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak wrote: > >> Or can I generally create new RDD from transformation and enrich its >> partitions with some metadata so that I would copy OffsetRanges in my new >> RDD in DStream? >> >> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak wrote: >> >>> Hi all, >>> I need to transform KafkaRDD into a new stream of deserialized case >>> classes. I want to use the new stream to save it to file and to perform >>> additional transformations on it. >>> >>> To save it I want to use offsets in filenames, hence I need OffsetRanges >>> in transformed RDD. But KafkaRDD is private, hence I don't know how to do >>> it. >>> >>> Alternatively I could deserialize directly in messageHandler before >>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad >>> messages (KafkaRDD => RDD it would be flatMap). >>> >>> Is there a way how to do it using messageHandler, is there another >>> approach? >>> >>> Many thanks for any help. >>> Petr >>> >> >> >
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
The solution how to share offsetRanges after DirectKafkaInputStream is transformed is in: https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java One thing I would like to understand is why Scala version is using normal variable while Java version uses AtomicReference. Another thing which I don't get is about closure serialization. The question why logger in the below code doesn't throw NPE even its instance isn't copied like in the case of offsetRanges, when val offsets = offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws on offsets(idx). I have something like this code: object StreamOps { val logger = LoggerFactory.getLogger("StreamOps") var offsetRanges = Array[OffsetRange]() def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { stream transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd flatmap { message => Try(... decode Array[Byte] to F...) match { case Success(fact) => Some(fact) case _ => None } } } // Error handling removed for brevity def save[F](stream: DStream[F]): Unit { stream foreachRDD { rdd => // It has to be here otherwise NullPointerException val offsets = offsetRanges rdd mapartitionWithIndex { (idx, facts) => // Use offsets here val writer = new MyWriter[F](offsets(idx), ...) facts foreach { fact => writer.write(fact) } writer.close() // Why logger works and doesn't throw NullPointerException? logger.info(...) Iterator.empty } foreach { (_: Nothing) => } } } Many thanks for any advice, I'm sure its a noob question. Petr On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak wrote: > Or can I generally create new RDD from transformation and enrich its > partitions with some metadata so that I would copy OffsetRanges in my new > RDD in DStream? > > On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak wrote: > >> Hi all, >> I need to transform KafkaRDD into a new stream of deserialized case >> classes. I want to use the new stream to save it to file and to perform >> additional transformations on it. >> >> To save it I want to use offsets in filenames, hence I need OffsetRanges >> in transformed RDD. But KafkaRDD is private, hence I don't know how to do >> it. >> >> Alternatively I could deserialize directly in messageHandler before >> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad >> messages (KafkaRDD => RDD it would be flatMap). >> >> Is there a way how to do it using messageHandler, is there another >> approach? >> >> Many thanks for any help. >> Petr >> > >
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
Or can I generally create new RDD from transformation and enrich its partitions with some metadata so that I would copy OffsetRanges in my new RDD in DStream? On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak wrote: > Hi all, > I need to transform KafkaRDD into a new stream of deserialized case > classes. I want to use the new stream to save it to file and to perform > additional transformations on it. > > To save it I want to use offsets in filenames, hence I need OffsetRanges > in transformed RDD. But KafkaRDD is private, hence I don't know how to do > it. > > Alternatively I could deserialize directly in messageHandler before > KafkaRDD but it seems it is 1:1 transformation while I need to drop bad > messages (KafkaRDD => RDD it would be flatMap). > > Is there a way how to do it using messageHandler, is there another > approach? > > Many thanks for any help. > Petr >