Hi Prasanna,

That’s good to hear and thanks for confirming that it works :)

Piotrek

> On 3 Jun 2020, at 16:09, Prasanna kumar <prasannakumarram...@gmail.com> wrote:
> 
> Piotr and Alexander , 
> 
> I have fixed the programmatic error in filter method and it is working now.
> 
> Thanks for the detailed help from both of you. Am able to add the sinks based 
> on the JSON and create DAG.
> 
> Thanks,
> Prasanna.
> 
> On Wed, Jun 3, 2020 at 4:51 PM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi Prasanna,
> 
> 1.
> 
> > The object probably contains or references non serializable fields.
> 
> That should speak for itself. Flink was not able to distribute your code to 
> the worker nodes. 
> 
> You have used a lambda function that turned out to be non serialisable. You 
> should unit test your code and in this case add a 
> serialisation/deserialisation round trip unit test for the filter function. 
> For starters I would suggest to not use lambda function, but a full/proper 
> named class and work from there.
> 
> 2.
> 
> Can not you create an array/map/collection of OutputTags corresponding to the 
> the sinks/topics combinations. One OutputTag per sink(/topic) and use this 
> array/map/collection inside your process function?
> 
> Piotrek 
> 
>> On 2 Jun 2020, at 13:49, Prasanna kumar <prasannakumarram...@gmail.com 
>> <mailto:prasannakumarram...@gmail.com>> wrote:
>> 
>> Hi ,
>> 
>> I have a Event router Registry as this. By reading this as input i need to 
>> create a Job which would redirect the messages to the correct sink as per 
>> condition.
>> {
>>   "eventRouterRegistry": [
>>     { "eventType": "biling", "outputTopic": "billing" },
>>     { "eventType": "cost", "outputTopic": "cost" },
>>     { "eventType": "marketing", "outputTopic": "marketing" }
>>   ]
>> }
>> I tried the following two approaches. 
>> 1) Using the Filter method
>> 
>> public static void setupRouterJobsFilter(
>>             List<eventRouterRegistry> registryList, 
>> StreamExecutionEnvironment env) {
>> 
>>    Properties props = new Properties();
>>    props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>>    props.put("client.id <http://client.id/>", "flink-example1");
>> 
>>    FlinkKafkaConsumer011 fkC =
>>                new FlinkKafkaConsumer011<>("EVENTTOPIC", new 
>> SimpleStringSchema(), props);
>> 
>>    DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
>> 
>>    for (eventRouterRegistry record : registryList) {
>>       System.out.print(record.getEventType() + " <==> " + 
>> record.getOutputTopic());
>> 
>>       FlinkKafkaProducer011 fkp =
>>                   new FlinkKafkaProducer011<>(record.getOutputTopic(), new 
>> SimpleStringSchema(), props);
>> 
>>       inputStream.filter(msg -> msg.equals(record.getEventType()) );
>>       //sideOutputStream.print();
>>       inputStream.addSink(fkp).name(record.getOutputTopic());
>>    }
>> }
>> Here am getting the following error. 
>>  ./bin/flink run -c firstflinkpackage.GenericStreamRouter 
>> ../../myrepository/flink001/target/flink001-1.0.jar
>> Starting execution of program
>> ---------------------------
>>  The program finished with the following exception:
>> 
>> The implementation of the FilterFunction is not serializable. The object 
>> probably contains or references non serializable fields.
>>      org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>>      
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
>>      
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>      
>> org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686)
>>      
>> firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118)
>>      firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93)
>> 
>> Looks like I should not use  record.getEventType() as this is outside of the 
>> stream. 
>> 
>> Is there a any way to use external variable here mainly to Generalise the 
>> process.
>> 
>> 2) Using the Side Output method
>> 
>> Following code is my attempt in creating a generic way for sideoutput 
>> creation. 
>> 
>> I am able to create the Sink Streams based on the list 
>> (eventRouterRegistry). 
>> 
>> But i could not generalise the Output tag creation. 
>> The issue here is the output tag is fixed. 
>> My output tag need to be the Event Type and that needs to be in Process 
>> Function too. 
>> 
>> How do i implement. Should I write my own process function ?
>>  public static void setupRouterJobs(
>>      List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) 
>> {
>> 
>>    Properties props = new Properties();
>>    props.put("bootstrap.servers", BOOTSTRAP_SERVER);
>>    props.put("client.id <http://client.id/>", "flink-example1");
>> 
>>    FlinkKafkaConsumer011 fkC =
>>        new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), 
>> props);
>> 
>>    DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC");
>>      //Even if i try to generalise OUtput tag here. How do i do it inside 
>> ProcessFunction
>>      final OutputTag<String> outputTag = new 
>> OutputTag<String>("side-output") {};
>>      SingleOutputStreamOperator<String> mainDataStream =
>>        inputStream.process(
>>            new ProcessFunction<String, String>() {
>> 
>>              @Override
>>              public void processElement(String value, Context ctx, 
>> Collector<String> out)
>>                  throws Exception {
>>                // emit data to side output
>>                ctx.output(OutputTag,  value);
>>              }
>>            });
>> 
>>    for (eventRouterRegistry record : registryList) {
>>      System.out.print(record.getEventType() + " <==> " + 
>> record.getOutputTopic());
>> 
>>      FlinkKafkaProducer011 fkp =
>>          new FlinkKafkaProducer011<>(record.getOutputTopic(), new 
>> SimpleStringSchema(), props);
>> 
>>      DataStream<String> sideOutputStream = 
>> mainDataStream.getSideOutput(outputTag);
>>      sideOutputStream.print();
>>      sideOutputStream.addSink(fkp).name(record.getOutputTopic());
>>    }
>> }
>> 
>> Thanks,
>> Prasanna.
>> 
>> 
>> 
>> On Thu, May 28, 2020 at 8:24 PM Prasanna kumar 
>> <prasannakumarram...@gmail.com <mailto:prasannakumarram...@gmail.com>> wrote:
>> Alexander, 
>> 
>> Thanks for the reply. Will implement and come back in case of any questions. 
>> 
>> Prasanna.
>> 
>> On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <alexan...@ververica.com 
>> <mailto:alexan...@ververica.com>> wrote:
>> Hi Prasanna,
>> 
>> if the set of all possible sinks is known in advance, side outputs will be 
>> generic enough to express your requirements. Side output produces a stream. 
>> Create all of the side output tags, associate each of them with one sink, 
>> add conditional logic around `ctx.output(outputTag, ... );`  to decide where 
>> to dispatch the messages  (see [1]), collect to none or many side outputs, 
>> depending on your logic.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
>> --
>> Alexander Fedulov | Solutions Architect
>> 
>> <image.png>
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>  <https://www.ververica.com/>
>> Follow us @VervericaData
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time
>> 
>> 
>> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar 
>> <prasannakumarram...@gmail.com <mailto:prasannakumarram...@gmail.com>> wrote:
>> Piotr, 
>> 
>> There is an event and subscriber registry as JSON file which has the table 
>> event mapping and event-subscriber mapping as mentioned below.
>> 
>> Based on the set JSON , we need to job to go through the table updates and 
>> create events and for each event there is a way set how to sink them.
>> 
>> The sink streams have to be added based on this JSON. Thats what i mentioned 
>> as no predefined sink in code earlier.
>> 
>> You could see that each event has different set of sinks. 
>> 
>> Just checking how much generic could Side-output streams be ?.
>> 
>> Source -> generate events -> (find out sinks dynamically in code ) -> write 
>> to the respective sinks. 
>> 
>> {
>>   " tablename ": "source.table1",
>>   "events": [
>>     {
>>       "operation": "update",
>>       "eventstobecreated": [
>>         {
>>           "eventname": "USERUPDATE",
>>           "Columnoperation": "and",
>>           "ColumnChanges": [
>>             {
>>               "columnname": "name"
>>             },
>>             {
>>               "columnname": "loginenabled",
>>               "value": "Y"
>>             }
>>           ],
>>           "Subscribers": [
>>             {
>>               "customername": "c1",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "USERTOPIC"
>>               }
>>             },
>>             {
>>               "customername": "c2",
>>               "method": "S3",
>>               "methodparams": {
>>                 "folder": "aws://folderC2 <>"
>>               }}, ]}]
>>     },
>>     {
>>       "operation": "insert",
>>       "eventstobecreated": [
>>           "eventname": "USERINSERT",
>>           "operation": "insert",
>>           "Subscribers": [
>>             {
>>               "teamname": "General",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "new_users"
>>               }
>>             },
>>             {
>>               "teamname": "General",
>>               "method": "kinesis",
>>               "methodparams": {
>>                 "URL": "new_users",
>>                 "username": "uname",
>>                 "password":  "pwd"
>>               }}, ]}]
>>     },
>>     {
>>       "operation": "delete",
>>       "eventstobecreated": [
>>         {
>>           "eventname": "USERDELETE",
>>           "Subscribers": [
>>             {
>>               "customername": "c1",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "USERTOPIC"
>>               }
>>             },
>>             {
>>               "customername": "c4",
>>               "method": "Kafka",
>>               "methodparams": {
>>                 "topicname": "deleterecords"
>>              }}, ]}]
>>      },
>> }
>> 
>> Please let me know your thoughts on this.
>> 
>> Thanks,
>> Prasanna.
>> 
>> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>> wrote:
>> Hi,
>> 
>> I’m not sure if I fully understand what do you mean by
>> 
>> > The point is the sink are not predefined.
>> 
>> You must know before submitting the job, what sinks are going to be used in 
>> the job. You can have some custom logic, that would filter out records 
>> before writing them to the sinks, as I proposed before, or you could use 
>> side outputs [1] would be better suited to your use case? 
>> 
>> Piotrek
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html>
>> 
>>> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com 
>>> <mailto:prasannakumarram...@gmail.com>> wrote:
>>> 
>>> Thanks Piotr for the Reply. 
>>> 
>>> I will explain my requirement in detail. 
>>> 
>>> Table Updates -> Generate Business Events -> Subscribers 
>>> 
>>> Source Side
>>> There are CDC of 100 tables which the framework needs to listen to. 
>>> 
>>> Event Table Mapping
>>> 
>>> There would be Event associated with table in a m:n fashion. 
>>> 
>>> say there are tables TA, TB, TC. 
>>> 
>>> EA, EA2 and EA3 are generated from TA (based on conditions)
>>> EB generated from TB (based on conditions)
>>> EC generated from TC (no conditions.)
>>> 
>>> Say there are events EA,EB,EC generated from the tables TA, TB, TC 
>>> 
>>> Event Sink Mapping
>>> 
>>> EA has following sinks. kafka topic SA,SA2,SAC. 
>>> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB.
>>> EC has only rest endpoint RC. 
>>> 
>>> The point is the sink are not predefined. [. But i only see the example 
>>> online where , flink code having explicit myStream.addSink(sink2);   ]
>>> 
>>> We expect around 500 types of events in our platform in another 2 years 
>>> time. 
>>> 
>>> We are looking at writing a generic job for the same , rather than writing 
>>> one for new case.
>>> 
>>> Let me know your thoughts and flink suitability to this requirement.
>>> 
>>> Thanks
>>> Prasanna.
>>> 
>>> 
>>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com 
>>> <mailto:pi...@ververica.com>> wrote:
>>> Hi,
>>> 
>>> You could easily filter/map/process the streams differently before writing 
>>> them to the sinks. Building on top of my previous example, this also should 
>>> work fine:
>>> 
>>> 
>>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, 
>>> but any ;
>>> 
>>> myStream.baz().addSink(sink1);
>>> myStream.addSink(sink2);
>>> myStream.qux().quuz().corge().addSink(sink3);
>>>  
>>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that 
>>> you wish. `foo` and `bar` would be applied once to the stream, before it’s 
>>> going to be split to different sinks, while `baz`, `qux`, `quuz` and 
>>> `corge` would be applied to only of the sinks AFTER splitting.
>>> 
>>> In your case, it could be:
>>> 
>>> myStream.filter(...).addSink(sink1);
>>> myStream.addSink(sink2);
>>> myStream.addSink(sink3);
>>> 
>>> So sink2 and sink3 would get all of the records, while sink1 only a portion 
>>> of them.
>>> 
>>> Piotrek 
>>> 
>>> 
>>>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com 
>>>> <mailto:prasannakumarram...@gmail.com>> wrote:
>>>> 
>>>> Piotr, 
>>>> 
>>>> Thanks for the reply. 
>>>> 
>>>> There is one other case, where some events have to be written to multiple 
>>>> sinks and while other have to be written to just one sink. 
>>>> 
>>>> How could i have a common codeflow/DAG for the same ?
>>>> 
>>>> I do not want multiple jobs to do the same want to accomplish in a single 
>>>> job .
>>>> 
>>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional 
>>>> operator such as 'if' to determine . 
>>>> 
>>>> But i suppose here the stream works differently compared to normal code 
>>>> processing.
>>>> 
>>>> Prasanna.
>>>> 
>>>> 
>>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com 
>>>> <mailto:pi...@ververica.com>> wrote:
>>>> Hi,
>>>> 
>>>> To the best of my knowledge the following pattern should work just fine:
>>>> 
>>>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, 
>>>> but any ;
>>>> myStream.addSink(sink1);
>>>> myStream.addSink(sink2);
>>>> myStream.addSink(sink3);
>>>> 
>>>> All of the records from `myStream` would be passed to each of the sinks.
>>>> 
>>>> Piotrek
>>>> 
>>>> > On 24 May 2020, at 19:34, Prasanna kumar <prasannakumarram...@gmail.com 
>>>> > <mailto:prasannakumarram...@gmail.com>> wrote:
>>>> > 
>>>> > Hi,
>>>> > 
>>>> > There is a single source of events for me in my system. 
>>>> > 
>>>> > I need to process and send the events to multiple destination/sink at 
>>>> > the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ]
>>>> > 
>>>> > I am able send to one sink.
>>>> > 
>>>> > By adding more sink stream to the source stream could we achieve it . 
>>>> > Are there any shortcomings.  
>>>> > 
>>>> > Please let me know if any one here has successfully implemented one .
>>>> > 
>>>> > Thanks,
>>>> > Prasanna.
> 

Reply via email to