Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Cody Koeninger
Python version has been available since 1.4.  It should be close to feature
parity with the jvm version in 1.5

On Tue, Aug 18, 2015 at 9:36 AM, ayan guha  wrote:

> Hi Cody
>
> A non-related question. Any idea when Python-version of direct receiver is
> expected? Me personally looking forward to it :)
>
> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger 
> wrote:
>
>> The solution you found is also in the docs:
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>
>> Java uses an atomic reference because Java doesn't allow you to close
>> over non-final references.
>>
>> I'm not clear on your other question.
>>
>> On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak  wrote:
>>
>>> The solution how to share offsetRanges after DirectKafkaInputStream is
>>> transformed is in:
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
>>>
>>> One thing I would like to understand is why Scala version is using
>>> normal variable while Java version uses AtomicReference.
>>>
>>> Another thing which I don't get is about closure serialization. The
>>> question why logger in the below code doesn't throw NPE even its instance
>>> isn't copied like in the case of offsetRanges, when val offsets =
>>> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
>>> on offsets(idx). I have something like this code:
>>>
>>> object StreamOps {
>>>
>>>   val logger = LoggerFactory.getLogger("StreamOps")
>>>   var offsetRanges = Array[OffsetRange]()
>>>
>>> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
>>>   stream transform { rdd =>
>>> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>
>>> rdd flatmap { message =>
>>>   Try(... decode Array[Byte] to F...) match {
>>> case Success(fact) => Some(fact)
>>> case _ => None
>>> }
>>>   }
>>> }
>>>
>>> // Error handling removed for brevity
>>> def save[F](stream: DStream[F]): Unit {
>>>   stream foreachRDD { rdd =>
>>> // It has to be here otherwise NullPointerException
>>> val offsets = offsetRanges
>>>
>>> rdd mapartitionWithIndex { (idx, facts) =>
>>>   // Use offsets here
>>>   val writer = new MyWriter[F](offsets(idx), ...)
>>>
>>>   facts foreach { fact =>
>>> writer.write(fact)
>>>   }
>>>
>>>   writer.close()
>>>
>>>   // Why logger works and doesn't throw NullPointerException?
>>>   logger.info(...)
>>>
>>>   Iterator.empty
>>> } foreach {
>>>   (_: Nothing) =>
>>> }
>>>   }
>>> }
>>>
>>> Many thanks for any advice, I'm sure its a noob question.
>>> Petr
>>>
>>> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak 
>>> wrote:
>>>
 Or can I generally create new RDD from transformation and enrich its
 partitions with some metadata so that I would copy OffsetRanges in my new
 RDD in DStream?

 On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak 
 wrote:

> Hi all,
> I need to transform KafkaRDD into a new stream of deserialized case
> classes. I want to use the new stream to save it to file and to perform
> additional transformations on it.
>
> To save it I want to use offsets in filenames, hence I need
> OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't
> know how to do it.
>
> Alternatively I could deserialize directly in messageHandler before
> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
> messages (KafkaRDD => RDD it would be flatMap).
>
> Is there a way how to do it using messageHandler, is there another
> approach?
>
> Many thanks for any help.
> Petr
>


>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread ayan guha
Hi Cody

A non-related question. Any idea when Python-version of direct receiver is
expected? Me personally looking forward to it :)

On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger  wrote:

> The solution you found is also in the docs:
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> Java uses an atomic reference because Java doesn't allow you to close over
> non-final references.
>
> I'm not clear on your other question.
>
> On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak  wrote:
>
>> The solution how to share offsetRanges after DirectKafkaInputStream is
>> transformed is in:
>>
>> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
>>
>> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
>>
>> One thing I would like to understand is why Scala version is using normal
>> variable while Java version uses AtomicReference.
>>
>> Another thing which I don't get is about closure serialization. The
>> question why logger in the below code doesn't throw NPE even its instance
>> isn't copied like in the case of offsetRanges, when val offsets =
>> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
>> on offsets(idx). I have something like this code:
>>
>> object StreamOps {
>>
>>   val logger = LoggerFactory.getLogger("StreamOps")
>>   var offsetRanges = Array[OffsetRange]()
>>
>> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
>>   stream transform { rdd =>
>> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>> rdd flatmap { message =>
>>   Try(... decode Array[Byte] to F...) match {
>> case Success(fact) => Some(fact)
>> case _ => None
>> }
>>   }
>> }
>>
>> // Error handling removed for brevity
>> def save[F](stream: DStream[F]): Unit {
>>   stream foreachRDD { rdd =>
>> // It has to be here otherwise NullPointerException
>> val offsets = offsetRanges
>>
>> rdd mapartitionWithIndex { (idx, facts) =>
>>   // Use offsets here
>>   val writer = new MyWriter[F](offsets(idx), ...)
>>
>>   facts foreach { fact =>
>> writer.write(fact)
>>   }
>>
>>   writer.close()
>>
>>   // Why logger works and doesn't throw NullPointerException?
>>   logger.info(...)
>>
>>   Iterator.empty
>> } foreach {
>>   (_: Nothing) =>
>> }
>>   }
>> }
>>
>> Many thanks for any advice, I'm sure its a noob question.
>> Petr
>>
>> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak  wrote:
>>
>>> Or can I generally create new RDD from transformation and enrich its
>>> partitions with some metadata so that I would copy OffsetRanges in my new
>>> RDD in DStream?
>>>
>>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak 
>>> wrote:
>>>
 Hi all,
 I need to transform KafkaRDD into a new stream of deserialized case
 classes. I want to use the new stream to save it to file and to perform
 additional transformations on it.

 To save it I want to use offsets in filenames, hence I need
 OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't
 know how to do it.

 Alternatively I could deserialize directly in messageHandler before
 KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
 messages (KafkaRDD => RDD it would be flatMap).

 Is there a way how to do it using messageHandler, is there another
 approach?

 Many thanks for any help.
 Petr

>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Cody Koeninger
The solution you found is also in the docs:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

Java uses an atomic reference because Java doesn't allow you to close over
non-final references.

I'm not clear on your other question.

On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak  wrote:

> The solution how to share offsetRanges after DirectKafkaInputStream is
> transformed is in:
>
> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
>
> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
>
> One thing I would like to understand is why Scala version is using normal
> variable while Java version uses AtomicReference.
>
> Another thing which I don't get is about closure serialization. The
> question why logger in the below code doesn't throw NPE even its instance
> isn't copied like in the case of offsetRanges, when val offsets =
> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
> on offsets(idx). I have something like this code:
>
> object StreamOps {
>
>   val logger = LoggerFactory.getLogger("StreamOps")
>   var offsetRanges = Array[OffsetRange]()
>
> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
>   stream transform { rdd =>
> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
> rdd flatmap { message =>
>   Try(... decode Array[Byte] to F...) match {
> case Success(fact) => Some(fact)
> case _ => None
> }
>   }
> }
>
> // Error handling removed for brevity
> def save[F](stream: DStream[F]): Unit {
>   stream foreachRDD { rdd =>
> // It has to be here otherwise NullPointerException
> val offsets = offsetRanges
>
> rdd mapartitionWithIndex { (idx, facts) =>
>   // Use offsets here
>   val writer = new MyWriter[F](offsets(idx), ...)
>
>   facts foreach { fact =>
> writer.write(fact)
>   }
>
>   writer.close()
>
>   // Why logger works and doesn't throw NullPointerException?
>   logger.info(...)
>
>   Iterator.empty
> } foreach {
>   (_: Nothing) =>
> }
>   }
> }
>
> Many thanks for any advice, I'm sure its a noob question.
> Petr
>
> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak  wrote:
>
>> Or can I generally create new RDD from transformation and enrich its
>> partitions with some metadata so that I would copy OffsetRanges in my new
>> RDD in DStream?
>>
>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak  wrote:
>>
>>> Hi all,
>>> I need to transform KafkaRDD into a new stream of deserialized case
>>> classes. I want to use the new stream to save it to file and to perform
>>> additional transformations on it.
>>>
>>> To save it I want to use offsets in filenames, hence I need OffsetRanges
>>> in transformed RDD. But KafkaRDD is private, hence I don't know how to do
>>> it.
>>>
>>> Alternatively I could deserialize directly in messageHandler before
>>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
>>> messages (KafkaRDD => RDD it would be flatMap).
>>>
>>> Is there a way how to do it using messageHandler, is there another
>>> approach?
>>>
>>> Many thanks for any help.
>>> Petr
>>>
>>
>>
>


Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Petr Novak
The solution how to share offsetRanges after DirectKafkaInputStream is
transformed is in:
https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

One thing I would like to understand is why Scala version is using normal
variable while Java version uses AtomicReference.

Another thing which I don't get is about closure serialization. The
question why logger in the below code doesn't throw NPE even its instance
isn't copied like in the case of offsetRanges, when val offsets =
offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
on offsets(idx). I have something like this code:

object StreamOps {

  val logger = LoggerFactory.getLogger("StreamOps")
  var offsetRanges = Array[OffsetRange]()

def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
  stream transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd flatmap { message =>
  Try(... decode Array[Byte] to F...) match {
case Success(fact) => Some(fact)
case _ => None
}
  }
}

// Error handling removed for brevity
def save[F](stream: DStream[F]): Unit {
  stream foreachRDD { rdd =>
// It has to be here otherwise NullPointerException
val offsets = offsetRanges

rdd mapartitionWithIndex { (idx, facts) =>
  // Use offsets here
  val writer = new MyWriter[F](offsets(idx), ...)

  facts foreach { fact =>
writer.write(fact)
  }

  writer.close()

  // Why logger works and doesn't throw NullPointerException?
  logger.info(...)

  Iterator.empty
} foreach {
  (_: Nothing) =>
}
  }
}

Many thanks for any advice, I'm sure its a noob question.
Petr

On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak  wrote:

> Or can I generally create new RDD from transformation and enrich its
> partitions with some metadata so that I would copy OffsetRanges in my new
> RDD in DStream?
>
> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak  wrote:
>
>> Hi all,
>> I need to transform KafkaRDD into a new stream of deserialized case
>> classes. I want to use the new stream to save it to file and to perform
>> additional transformations on it.
>>
>> To save it I want to use offsets in filenames, hence I need OffsetRanges
>> in transformed RDD. But KafkaRDD is private, hence I don't know how to do
>> it.
>>
>> Alternatively I could deserialize directly in messageHandler before
>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
>> messages (KafkaRDD => RDD it would be flatMap).
>>
>> Is there a way how to do it using messageHandler, is there another
>> approach?
>>
>> Many thanks for any help.
>> Petr
>>
>
>


Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-17 Thread Petr Novak
Or can I generally create new RDD from transformation and enrich its
partitions with some metadata so that I would copy OffsetRanges in my new
RDD in DStream?

On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak  wrote:

> Hi all,
> I need to transform KafkaRDD into a new stream of deserialized case
> classes. I want to use the new stream to save it to file and to perform
> additional transformations on it.
>
> To save it I want to use offsets in filenames, hence I need OffsetRanges
> in transformed RDD. But KafkaRDD is private, hence I don't know how to do
> it.
>
> Alternatively I could deserialize directly in messageHandler before
> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
> messages (KafkaRDD => RDD it would be flatMap).
>
> Is there a way how to do it using messageHandler, is there another
> approach?
>
> Many thanks for any help.
> Petr
>