Re: KafkaIO Exactly once vs At least Once

2020-06-22 Thread Eleanore Jin
Hi Alexey,

Thanks for the response, below are some of my follow up question:

the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the way
I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
"mySlinkGroupId"), reading from your response, do I need additionally
configure KafkaProducer property enable.idempotence=true, or I only need to
configure this property?

The configuration I currently tried:
- from FlinkRunnerOptions:

   1. enable checkpoints: options.setCheckpointingInterval(10_000L);
   2. set state backend to Filesystem:
   options.setStateBackendFactory(FsBackendFactory. class);
   3. optionally set number of retries when pipeline fails before exit the
   application: options.setNumberOfExecutionRetries(2);


- from KafkaIO.read():

   1. set enable.auto.commit to false for Kafka ConsumerConfig
   2. set Kafka ConsumerConfig isolation.level to read_committed via Beam
   KafkaIO.Read: withReadCommitted()
   3. set to commit finalized offset to Kafka, called when finalize
   checkpoint: commitOffsetsInFinalize()

- from KafkaIO.write(), I only configured:
withEOS(numPartitionsForSinkTopic, "mySlinkGroupId")

With the above settings,  the current observation is: when I inject some
artificial Exception, the beam job is triggered to restart and there is no
message loss, but the message will not show up in the output topic until
the checkpoint finishes, so the latency depends on the checkpoint interval.

So can you please correct me if the above settings is not the optimal and
if there is anyway to reduce the latency by introducing checkpointing for
EOS?

Thanks a lot!
Eleanore


On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko 
wrote:

> I think you don’t need to enable EOS in this case since KafkaIO has a
> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
> all runners) and it relies on setting “enable.idempotence=true” for
> KafkaProducer.
> I’m not sure that you can achieve “at least once” semantics with current
> KafkaIO implementation.
>
> On 16 Jun 2020, at 17:16, Eleanore Jin  wrote:
>
> Hi All,
>
> I previously asked a few questions regarding enable EOS (exactly once
> semantics) please see below.
>
> Our Beam pipeline uses KafkaIO to read from source topic, and then use
> KafkaIO to publish to sink topic.
>
> According to Max's answer to my previous questions, enable EOS with
> KafkaIO will introduce latency,
> as only after checkpoints of all message within the checkpoint interval,
> then the KakfaIO.ExactlyOnceWriter
> processElement method will be called. So the latency depends on the
> checkpoint interval.
>
> I just wonder if I relax to At Least Once, do I still need to enable EOS
> on KafkaIO? Or it is not required?
> If not, can you please provide some instruction how should it be done?
>
> Thanks a lot!
> Eleanore
>
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR , can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
>
> That's correct.
>
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
>
> Yes, the buffered elements will be checkpointed.
>
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
>
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
>
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
>
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
>
>
>


Re: Designing an existing pipeline in Beam

2020-06-22 Thread Praveen K Viswanathan
Another way to put this question is, how do we write a beam pipeline for
an existing pipeline (in Java) that has a dozen of custom objects and you
have to work with multiple HashMaps of those custom objects in order to
transform it. Currently, I am writing a beam pipeline by using the same
Custom objects, getters and setters and HashMap *but inside
a DoFn*. Is this the optimal way or does Beam offer something else?

On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke,
>
> We can say Map 2 as a kind of a template using which you want to enrich
> data in Map 1. As I mentioned in my previous post, this is a high level
> scenario.
>
> All these logic are spread across several classes (with ~4K lines of code
> in total). As in any Java application,
>
> 1. The code has been modularized with multiple method calls
> 2. Passing around HashMaps as argument to each method
> 3. Accessing the attributes of the custom object using getters and setters.
>
> This is a common pattern in a normal Java application but I have not seen
> such an example of code in Beam.
>
>
> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>
>> Who reads map 1?
>> Can it be stale?
>>
>> It is unclear what you are trying to do in parallel and why you wouldn't
>> stick all this logic into a single DoFn / stateful DoFn.
>>
>> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Hello Everyone,
>>>
>>> I am in the process of implementing an existing pipeline (written using
>>> Java and Kafka) in Apache Beam. The data from the source stream is
>>> contrived and had to go through several steps of enrichment using REST API
>>> calls and parsing of JSON data. The key
>>> transformation in the existing pipeline is in shown below (a super high
>>> level flow)
>>>
>>> *Method A*
>>> Calls *Method B*
>>>   Creates *Map 1, Map 2*
>>> Calls *Method C*
>>>  Read *Map 2*
>>>  Create *Map 3*
>>> *Method C*
>>>  Read *Map 3* and
>>>  update *Map 1*
>>>
>>> The Map we use are multi-level maps and I am thinking of having
>>> PCollections for each Maps and pass them as side inputs in a DoFn wherever
>>> I have transformations that need two or more Maps. But there are certain
>>> tasks which I want to make sure that I am following right approach, for
>>> instance updating one of the side input maps inside a DoFn.
>>>
>>> These are my initial thoughts/questions and I would like to get some
>>> expert advice on how we typically design such an interleaved transformation
>>> in Apache Beam. Appreciate your valuable insights on this.
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>


-- 
Thanks,
Praveen K Viswanathan


Re: Designing an existing pipeline in Beam

2020-06-22 Thread Praveen K Viswanathan
Hi Luke,

We can say Map 2 as a kind of a template using which you want to enrich
data in Map 1. As I mentioned in my previous post, this is a high level
scenario.

All these logic are spread across several classes (with ~4K lines of code
in total). As in any Java application,

1. The code has been modularized with multiple method calls
2. Passing around HashMaps as argument to each method
3. Accessing the attributes of the custom object using getters and setters.

This is a common pattern in a normal Java application but I have not seen
such an example of code in Beam.


On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:

> Who reads map 1?
> Can it be stale?
>
> It is unclear what you are trying to do in parallel and why you wouldn't
> stick all this logic into a single DoFn / stateful DoFn.
>
> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> I am in the process of implementing an existing pipeline (written using
>> Java and Kafka) in Apache Beam. The data from the source stream is
>> contrived and had to go through several steps of enrichment using REST API
>> calls and parsing of JSON data. The key
>> transformation in the existing pipeline is in shown below (a super high
>> level flow)
>>
>> *Method A*
>> Calls *Method B*
>>   Creates *Map 1, Map 2*
>> Calls *Method C*
>>  Read *Map 2*
>>  Create *Map 3*
>> *Method C*
>>  Read *Map 3* and
>>  update *Map 1*
>>
>> The Map we use are multi-level maps and I am thinking of having
>> PCollections for each Maps and pass them as side inputs in a DoFn wherever
>> I have transformations that need two or more Maps. But there are certain
>> tasks which I want to make sure that I am following right approach, for
>> instance updating one of the side input maps inside a DoFn.
>>
>> These are my initial thoughts/questions and I would like to get some
>> expert advice on how we typically design such an interleaved transformation
>> in Apache Beam. Appreciate your valuable insights on this.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Re: KafkaIO Exactly once vs At least Once

2020-06-22 Thread Alexey Romanenko
I think you don’t need to enable EOS in this case since KafkaIO has a dedicated 
EOS-sink implementation for Beam part (btw, it’s not supported by all runners) 
and it relies on setting “enable.idempotence=true” for KafkaProducer.
I’m not sure that you can achieve “at least once” semantics with current 
KafkaIO implementation.

> On 16 Jun 2020, at 17:16, Eleanore Jin  wrote:
> 
> Hi All, 
> 
> I previously asked a few questions regarding enable EOS (exactly once 
> semantics) please see below.
> 
> Our Beam pipeline uses KafkaIO to read from source topic, and then use 
> KafkaIO to publish to sink topic.
> 
> According to Max's answer to my previous questions, enable EOS with KafkaIO 
> will introduce latency, 
> as only after checkpoints of all message within the checkpoint interval, then 
> the KakfaIO.ExactlyOnceWriter
> processElement method will be called. So the latency depends on the 
> checkpoint interval.
> 
> I just wonder if I relax to At Least Once, do I still need to enable EOS on 
> KafkaIO? Or it is not required?
> If not, can you please provide some instruction how should it be done?
> 
> Thanks a lot!
> Eleanore
> 
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR  > >, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
> 
> That's correct.
> 
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
> 
> Yes, the buffered elements will be checkpointed.
> 
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
> 
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
> 
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
> 
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
> 
> 



Flink/Portable Runner error on AWS EMR

2020-06-22 Thread Jesse Lord
I am trying to run the wordcount quickstart example on a flink cluster on AWS 
EMR. Beam version 2.22, Flink 1.10.

I get the following error:

ERROR:root:java.util.ServiceConfigurationError: 
com.fasterxml.jackson.databind.Module: Provider 
com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype

This happens with both the portable runner (using python SDK) and the classic 
flink runner using the quickstart maven project.

I think this error relates to this issue: 
https://issues.apache.org/jira/browse/BEAM-9239. Based on the comments from 
this issue I tried adjusting parameters for whether flink prioritizes loading 
child (user) jars or parent (flink) jars in the classpath but it did not 
resolve the issue.

Looking for any suggestions that might help as a workaround and wondering if I 
should open a new jira issue or only add my comment to the existing issue 
(which I have already done).

Thanks,
Jesse


Re: Designing an existing pipeline in Beam

2020-06-22 Thread Luke Cwik
Who reads map 1?
Can it be stale?

It is unclear what you are trying to do in parallel and why you wouldn't
stick all this logic into a single DoFn / stateful DoFn.

On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hello Everyone,
>
> I am in the process of implementing an existing pipeline (written using
> Java and Kafka) in Apache Beam. The data from the source stream is
> contrived and had to go through several steps of enrichment using REST API
> calls and parsing of JSON data. The key
> transformation in the existing pipeline is in shown below (a super high
> level flow)
>
> *Method A*
> Calls *Method B*
>   Creates *Map 1, Map 2*
> Calls *Method C*
>  Read *Map 2*
>  Create *Map 3*
> *Method C*
>  Read *Map 3* and
>  update *Map 1*
>
> The Map we use are multi-level maps and I am thinking of having
> PCollections for each Maps and pass them as side inputs in a DoFn wherever
> I have transformations that need two or more Maps. But there are certain
> tasks which I want to make sure that I am following right approach, for
> instance updating one of the side input maps inside a DoFn.
>
> These are my initial thoughts/questions and I would like to get some
> expert advice on how we typically design such an interleaved transformation
> in Apache Beam. Appreciate your valuable insights on this.
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Strange graph generated by beam

2020-06-22 Thread Luke Cwik
It would be helpful if the names of the transforms were visible on the
graph otherwise it is really hard understanding what each stage and step do.

On Mon, Jun 22, 2020 at 3:53 AM 吴亭  wrote:

> Hi,
>
> We are using the beam to read the stream from Kafka and using spark as the
> runner, and I found our spark graph looks very strange.
>
> For each batch stream, it will generate 3 stages, one of them of our
> actual work, that I can understand.
>
> Another two is kind of duplicated, you can take a look at the attached
> graphs:
> [image: image.png]
>
>
> [image: image.png]
> You will see the second graph actually includes the first one, I have no
> idea why it will display two?
>
> Is this correct?
>
> Br,
> Tim
>


Strange graph generated by beam

2020-06-22 Thread 吴亭
Hi,

We are using the beam to read the stream from Kafka and using spark as the
runner, and I found our spark graph looks very strange.

For each batch stream, it will generate 3 stages, one of them of our actual
work, that I can understand.

Another two is kind of duplicated, you can take a look at the attached
graphs:
[image: image.png]


[image: image.png]
You will see the second graph actually includes the first one, I have no
idea why it will display two?

Is this correct?

Br,
Tim