Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-19 Thread Tzu-Li (Gordon) Tai
Our parser.parse() function has a one-to-one mapping between an input byte[] 
to a List
Ideally, this should be handled within the KeyedDeserializationSchema passed to 
your Kafka consumer. That would then avoid the need of an extra “parser map 
function” after the source.

Were you suggesting a flatMap instead of map at this stage of 
calling our parser, or did you mean to use a flatMap() after the parser and 
before the split()? 
I meant a flatMap after the parser (whether it’s done as a map function or 
within the Kafka source) and before the split. The flatMap function iterates 
through your per-record lists and collects as it iterates through them.

- Gordon




On 18 July 2017 at 3:02:45 AM, earellano (eric.arell...@ge.com) wrote:

Tzu-Li (Gordon) Tai wrote  
> Basically, when two operators are chained together, the output of the  
> first operator is immediately chained to the processElement of the next  
> operator; it’s therefore just a consecutive invocation of processElements  
> on the chained operators. There will be no thread-to-thread handover or  
> buffering.  

Okay great, chaining tasks does sound like what we want then.  



Tzu-Li (Gordon) Tai wrote  
> In that case, I would suggest using flatMap here, followed by chained  
> splits and then sinks.  

We changed our code to roughly follow this suggestion, but I'm not sure  
we're doing this correctly? Is there a better way you recommend chaining the  
tasks? As written below, are individual Events within the List being sent to  
their respective sinks right away, or does the whole list have to split  
first?  

  

We also had issues getting flatMap to work, and map seemed more appropriate.  
Our parser.parse() function has a one-to-one mapping between an input byte[]  
to a List, and that never changes, so a map seems to make  
sense to us. Were you suggesting a flatMap instead of map at this stage of  
calling our parser, or did you mean to use a flatMap() after the parser and  
before the split()?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14312.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-17 Thread earellano
Tzu-Li (Gordon) Tai wrote
> Basically, when two operators are chained together, the output of the
> first operator is immediately chained to the processElement of the next
> operator; it’s therefore just a consecutive invocation of processElements
> on the chained operators. There will be no thread-to-thread handover or
> buffering.

Okay great, chaining tasks does sound like what we want then.



Tzu-Li (Gordon) Tai wrote
> In that case, I would suggest using flatMap here, followed by chained
> splits and then sinks.

We changed our code to roughly follow this suggestion, but I'm not sure
we're doing this correctly? Is there a better way you recommend chaining the
tasks? As written below, are individual Events within the List being sent to
their respective sinks right away, or does the whole list have to split
first?

 

We also had issues getting flatMap to work, and map seemed more appropriate.
Our parser.parse() function has a one-to-one mapping between an input byte[]
to a List, and that never changes, so a map seems to make
sense to us. Were you suggesting a flatMap instead of map at this stage of
calling our parser, or did you mean to use a flatMap() after the parser and
before the split()?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14312.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-17 Thread Tzu-Li (Gordon) Tai
With task chaining as you're saying, could you help clarify how it works 
please?
Operator can be chained to be executed by a single task thread. See [1] for 
more details on that.

Basically, when two operators are chained together, the output of the first 
operator is immediately chained to the processElement of the next operator; 
it’s therefore just a consecutive invocation of processElements on the chained 
operators. There will be no thread-to-thread handover or buffering.

For example, a 
byte[] record can return from our parser a List of 10 SuccessEvents and 1 
ErrorEvent; we want to publish each Event immediately.
In that case, I would suggest using flatMap here, followed by chained splits 
and then sinks.

Using flatMap, you can collect elements as you iterate through the list element 
(i.e. `collector.collect(...)`). If the sinks are properly chained (which 
should be the case if there is no keyBy before the sink and you haven’t 
explicitly configured otherwise [2]), then for each .collect(...) the sink 
write will be invoked as part of the chain.

Effectively, this would then be writing to Kafka / Cassandra for every element 
as you iterate through that list (happening in the same thread since everything 
is chained), and matches what you have in mind.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#tasks-and-operator-chains
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups

On 17 July 2017 at 2:06:52 PM, earellano (eric.arell...@ge.com) wrote:

Hi,  

Tzu-Li (Gordon) Tai wrote  
> These seems odd. Are your events intended to be a list? If not, this  
> should be a `DataStream  
>   
> `.  
>  
> From the code snippet you’ve attached in the first post, it seems like  
> you’ve initialized your source incorrectly.  
>  
> `env.fromElements(List<...>)` will take the whole list as a single event,  
> thus your source is only emitting a single list as a record.  

Ah sorry for the confusion. So the original code snippet isn't our actual  
code - it's a simplified and generified version so that it would be easy to  
reproduce the Null Pointer Exception without having to show our whole code  
base.  

To clarify, our input actually uses a Kafka Consumer that reads a byte[],  
which is then passed to our external library parser which takes a byte[] and  
converts it into a List. This is why we have to use  
DataStream>, rather than just DataStream. It's a  
requirement from the parser we have to use, because each byte[] array record  
can create both a SuccessEvent(s) and/or ErrorEvent(s).  

Our motivation for using the above map & for loop with conditional output  
logic was that we have to work with this whole List and not just  
individual Events, but don't want to wait for the whole list to be processed  
for the event at the beginning of the list to be outputted. For example, a  
byte[] record can return from our parser a List of 10 SuccessEvents and 1  
ErrorEvent; we want to publish each Event immediately. Low latency is  
extremely important to us.  

--  

With task chaining as you're saying, could you help clarify how it works  
please? With each record of type List and calling the Split Operator  
followed by the sink operators, does that whole record/list have to be split  
before it can then go on to the sink? Or does task chaining mean it  
immediately gets outputted to the sink?  


Thanks so much for all this help by the way!  




--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-16 Thread earellano
Hi, 

Tzu-Li (Gordon) Tai wrote
> These seems odd. Are your events intended to be a list? If not, this
> should be a `DataStream
> 
> `.
> 
> From the code snippet you’ve attached in the first post, it seems like
> you’ve initialized your source incorrectly.
> 
> `env.fromElements(List<...>)` will take the whole list as a single event,
> thus your source is only emitting a single list as a record.

Ah sorry for the confusion. So the original code snippet isn't our actual
code - it's a simplified and generified version so that it would be easy to
reproduce the Null Pointer Exception without having to show our whole code
base. 

To clarify, our input actually uses a Kafka Consumer that reads a byte[],
which is then passed to our external library parser which takes a byte[] and
converts it into a List. This is why we have to use
DataStream>, rather than just DataStream. It's a
requirement from the parser we have to use, because each byte[] array record
can create both a SuccessEvent(s) and/or ErrorEvent(s).

Our motivation for using the above map & for loop with conditional output
logic was that we have to work with this whole List and not just
individual Events, but don't want to wait for the whole list to be processed
for the event at the beginning of the list to be outputted. For example, a
byte[] record can return from our parser a List of 10 SuccessEvents and 1
ErrorEvent; we want to publish each Event immediately. Low latency is
extremely important to us.

--

With task chaining as you're saying, could you help clarify how it works
please? With each record of type List and calling the Split Operator
followed by the sink operators, does that whole record/list have to be split
before it can then go on to the sink? Or does task chaining mean it
immediately gets outputted to the sink?


Thanks so much for all this help by the way!




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14300.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-16 Thread Tzu-Li (Gordon) Tai
Hi,

void output(DataStream> inputStream) {
These seems odd. Are your events intended to be a list? If not, this should be 
a `DataStream`.

From the code snippet you’ve attached in the first post, it seems like you’ve 
initialized your source incorrectly.

`env.fromElements(List<...>)` will take the whole list as a single event, thus 
your source is only emitting a single list as a record. Perhaps what you 
actually want to do here is `env.fromCollection(List<...>)`?

This should also eliminate the situation that “only after the whole List is 
processed can the records then be sent to their respective sinks”, as you 
mentioned in your reply.

but this doesn't seem very ideal to us because it requires a new operator to 
first split the stream
IMO, this wouldn’t really introduce noticeable overhead, as the operator will 
be chained to the map operator. Side outputs is also the preferred way here, as 
side outputs subsume stream splitting.



Overall, I think it is reasonable to do a map -> split -> Kafka / Cassandra 
sinks in your case, given that you’ve declared the source correctly to be a 
single SuperclassEvent as a record.

The operator overhead is fairly trivial if it is chained. Another reason to use 
sinks properly is that only then will you benefit from the exactly-once / 
at-least-once delivery guarantees to external systems (which requires 
collaboration between the sink and Flink’s checkpointing).

Hope this helps!

Cheers,
Gordon


On 17 July 2017 at 2:59:38 AM, earellano [via Apache Flink User Mailing List 
archive.] (ml+s2336050n14294...@n4.nabble.com) wrote:

Tzu-Li (Gordon) Tai wrote
It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is 
there any specific reason why you want to emit elements to Kafka in a map 
function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, 
schema, props);
The reason we want to use processElement() & a map function, instead of 
someStream.addSink() is that our output logic has conditional depending on the 
type of record we have.

Our overall program follows this path:

  Serialized JSON consumed from Kafka: DataStream
  parsed, producing a List of successful events and/or error events: 
DataStream>
  outputted conditionally, going to Kafka or Cassandra depending on which type 
of event it is.


This is our code for output logic (although modified types to not use our IP):

void output(DataStream> inputStream) {
    inputStream.map( eventList ->
      for (SuperclassEvent  event : eventList) {
         if (event instanceof SuccessEvent)
            emitToCassandra(event);
         else if (event instanceof ErrorEvent)
            emitToKafka(event);
       }
       return true;  // we don't actually want to return anything, just don't 
know how else to use map
);

 
That is, we have sinks for both Kafka and Cassandra, and want to be able to 
iterate through our List and conditionally send each 
individual record to its appropriate sink depending on its type.

I know Flink offers SplitStreams for a similar purpose, but this doesn't seem 
very ideal to us because it requires a new operator to first split the stream, 
and only after the whole List is processed can the records then be sent to 
their respective sinks. Whereas the code above sends the records to their sinks 
immediately upon finding its type.  

--

Is there any way to make processElement() work so that we can work on 
individual records instead of the whole DataStream? Or are we misusing Flink? 
How do you recommend doing this the best way possible?


--

Also, if processElement() and invoke() aren't meant to be used, should they be 
made package private? Happy to make a pull request if so, although fear that 
might break a few things.

If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14294.html
This email was sent by earellano (via Nabble)
To receive all replies by email, subscribe to this discussion


Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-16 Thread earellano
Tzu-Li (Gordon) Tai wrote
> It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or
> is there any specific reason why you want to emit elements to Kafka in a
> map function?
> 
> The correct way to use it is to add it as a sink function to your
> pipeline, i.e.
> 
> DataStream
> 
>  someStream = …
> someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
> // or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream,
> “topic”, schema, props);

The reason we want to use processElement() & a map function, instead of
someStream.addSink() is that our output logic has conditional depending on
the type of record we have.

Our overall program follows this path:

  Serialized JSON consumed from Kafka: DataStream
  parsed, producing a List of successful events and/or error events:
DataStream>
  outputted conditionally, going to Kafka or Cassandra depending on which
type of event it is.


This is our code for output logic (although modified types to not use our
IP):

void output(DataStream> inputStream) {
inputStream.map( eventList ->
  for (SuperclassEvent  event : eventList) {
 if (event instanceof SuccessEvent)
emitToCassandra(event);
 else if (event instanceof ErrorEvent)
emitToKafka(event);
   }
   return true;  // we don't actually want to return anything, just
don't know how else to use map
);

 
That is, we have sinks for both Kafka and Cassandra, and want to be able to
iterate through our List and conditionally send each
individual record to its appropriate sink depending on its type. 

I know Flink offers SplitStreams for a similar purpose, but this doesn't
seem very ideal to us because it requires a new operator to first split the
stream, and only after the whole List is processed can the records then be
sent to their respective sinks. Whereas the code above sends the records to
their sinks immediately upon finding its type.  

--

Is there any way to make processElement() work so that we can work on
individual records instead of the whole DataStream? Or are we misusing
Flink? How do you recommend doing this the best way possible?


-- 

Also, if processElement() and invoke() aren't meant to be used, should they
be made package private? Happy to make a pull request if so, although fear
that might break a few things.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288p14294.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka Producer - Null Pointer Exception when processing by element

2017-07-14 Thread Tzu-Li (Gordon) Tai
Hi,

It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is 
there any specific reason why you want to emit elements to Kafka in a map 
function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, 
schema, props);

The processElement is invoked internally by the system, and isn’t intended to 
be invoked by user code.

See [1] for more details.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer

On 15 July 2017 at 3:35:32 AM, earellano (eric.arell...@ge.com) wrote:

I'm getting a NullPointerException when calling  
KakfaProducer010.processElement(StreamRecord). Specifically, this comes  
from its helper function invokeInternally(), and the function's  
internalProducer not being configured properly, resulting in passing a null  
value to one its helper functions.  

We'd really appreciate taking a look at below to see if this is a Flink bug  
or something we're doing wrong.  

Our code  

This is a simplified version of our program:  

  

  

You can copy this code here to reproduce locally:  
https://pastebin.com/Li8iZuFj   

Stack trace  

Here is the stack trace:  

  

What causes error in Flink code  

The method processElement() calls invokeInternally(). Within  
invokeInternally(), Flink tries to parse variable values, e.g. topic name  
and partitions.  

The app fails when trying to resolve the partitions. Specifically, the  
method to resolve the partitions has a parameter of KafkaProducer, which is  
passed as null, resulting in the NullPointerException. See the highlighted  
lines below of running the program in debugger view.  

  

So, I think the issue is that the internalProducer is not being setup  
correctly. Namely, it never sets the value for its producer field, so this  
stays null and then gets passed around, resulting in the Null Pointer  
Exception.  

Bug? Or issue with our code?  

My question to you all is if this is a bug that needs to be fixed, or if it  
results from us improperly configuring our program? The above code shows our  
configuration within the program itself (just setting bootstrap.servers),  
and we created the Kafka topic on our local machine as follows:  

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor  
1 --partitions 1 --topic process-elements-tests  



Any help greatly appreciated! We're really hoping to get processElements()  
to work, because our streaming architecture requires working on individual  
elements rather than the entire data stream (sink behavior depends on the  
individual values within each record of our DataStream>).  





--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.