Re: Multiple Sinks for a Single Soure

2020-06-04 Thread Piotr Nowojski
Hi Prasanna, That’s good to hear and thanks for confirming that it works :) Piotrek > On 3 Jun 2020, at 16:09, Prasanna kumar wrote: > > Piotr and Alexander , > > I have fixed the programmatic error in filter method and it is working now. > > Thanks for the detailed help from both of you.

Re: Multiple Sinks for a Single Soure

2020-06-03 Thread Prasanna kumar
Piotr and Alexander , I have fixed the programmatic error in filter method and it is working now. Thanks for the detailed help from both of you. Am able to add the sinks based on the JSON and create DAG. Thanks, Prasanna. On Wed, Jun 3, 2020 at 4:51 PM Piotr Nowojski wrote: > Hi Prasanna, >

Re: Multiple Sinks for a Single Soure

2020-06-03 Thread Piotr Nowojski
Hi Prasanna, 1. > The object probably contains or references non serializable fields. That should speak for itself. Flink was not able to distribute your code to the worker nodes. You have used a lambda function that turned out to be non serialisable. You should unit test your code and in

Re: Multiple Sinks for a Single Soure

2020-06-02 Thread Prasanna kumar
Hi , I have a Event router Registry as this. By reading this as input i need to create a Job which would redirect the messages to the correct sink as per condition. { "eventRouterRegistry": [ { "eventType": "biling", "outputTopic": "billing" }, { "eventType": "cost", "outputTopic":

Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Prasanna kumar
Alexander, Thanks for the reply. Will implement and come back in case of any questions. Prasanna. On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov wrote: > Hi Prasanna, > > if the set of all possible sinks is known in advance, side outputs will be > generic enough to express your

Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Alexander Fedulov
Hi Prasanna, if the set of all possible sinks is known in advance, side outputs will be generic enough to express your requirements. Side output produces a stream. Create all of the side output tags, associate each of them with one sink, add conditional logic around `ctx.output(outputTag, ...

Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Prasanna kumar
Piotr, There is an event and subscriber registry as JSON file which has the table event mapping and event-subscriber mapping as mentioned below. Based on the set JSON , we need to job to go through the table updates and create events and for each event there is a way set how to sink them. The

Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Piotr Nowojski
Hi, I’m not sure if I fully understand what do you mean by > The point is the sink are not predefined. You must know before submitting the job, what sinks are going to be used in the job. You can have some custom logic, that would filter out records before writing them to the sinks, as I

Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Prasanna kumar
Thanks Piotr for the Reply. I will explain my requirement in detail. Table Updates -> Generate Business Events -> Subscribers *Source Side* There are CDC of 100 tables which the framework needs to listen to. *Event Table Mapping* There would be Event associated with table in a *m:n* fashion.

Re: Multiple Sinks for a Single Soure

2020-05-26 Thread Piotr Nowojski
Hi, You could easily filter/map/process the streams differently before writing them to the sinks. Building on top of my previous example, this also should work fine: DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ; myStream.baz().addSink(sink1);

Re: Multiple Sinks for a Single Soure

2020-05-25 Thread Prasanna kumar
Piotr, Thanks for the reply. There is one other case, where some events have to be written to multiple sinks and while other have to be written to just one sink. How could i have a common codeflow/DAG for the same ? I do not want multiple jobs to do the same want to accomplish in a single job

Re: Multiple Sinks for a Single Soure

2020-05-25 Thread Piotr Nowojski
Hi, To the best of my knowledge the following pattern should work just fine: DataStream myStream = env.addSource(…).foo().bar() // for custom source, but any ; myStream.addSink(sink1); myStream.addSink(sink2); myStream.addSink(sink3); All of the records from `myStream` would be passed to each