Hi Prasanna, That’s good to hear and thanks for confirming that it works :)
Piotrek > On 3 Jun 2020, at 16:09, Prasanna kumar <prasannakumarram...@gmail.com> wrote: > > Piotr and Alexander , > > I have fixed the programmatic error in filter method and it is working now. > > Thanks for the detailed help from both of you. Am able to add the sinks based > on the JSON and create DAG. > > Thanks, > Prasanna. > > On Wed, Jun 3, 2020 at 4:51 PM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi Prasanna, > > 1. > > > The object probably contains or references non serializable fields. > > That should speak for itself. Flink was not able to distribute your code to > the worker nodes. > > You have used a lambda function that turned out to be non serialisable. You > should unit test your code and in this case add a > serialisation/deserialisation round trip unit test for the filter function. > For starters I would suggest to not use lambda function, but a full/proper > named class and work from there. > > 2. > > Can not you create an array/map/collection of OutputTags corresponding to the > the sinks/topics combinations. One OutputTag per sink(/topic) and use this > array/map/collection inside your process function? > > Piotrek > >> On 2 Jun 2020, at 13:49, Prasanna kumar <prasannakumarram...@gmail.com >> <mailto:prasannakumarram...@gmail.com>> wrote: >> >> Hi , >> >> I have a Event router Registry as this. By reading this as input i need to >> create a Job which would redirect the messages to the correct sink as per >> condition. >> { >> "eventRouterRegistry": [ >> { "eventType": "biling", "outputTopic": "billing" }, >> { "eventType": "cost", "outputTopic": "cost" }, >> { "eventType": "marketing", "outputTopic": "marketing" } >> ] >> } >> I tried the following two approaches. >> 1) Using the Filter method >> >> public static void setupRouterJobsFilter( >> List<eventRouterRegistry> registryList, >> StreamExecutionEnvironment env) { >> >> Properties props = new Properties(); >> props.put("bootstrap.servers", BOOTSTRAP_SERVER); >> props.put("client.id <http://client.id/>", "flink-example1"); >> >> FlinkKafkaConsumer011 fkC = >> new FlinkKafkaConsumer011<>("EVENTTOPIC", new >> SimpleStringSchema(), props); >> >> DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC"); >> >> for (eventRouterRegistry record : registryList) { >> System.out.print(record.getEventType() + " <==> " + >> record.getOutputTopic()); >> >> FlinkKafkaProducer011 fkp = >> new FlinkKafkaProducer011<>(record.getOutputTopic(), new >> SimpleStringSchema(), props); >> >> inputStream.filter(msg -> msg.equals(record.getEventType()) ); >> //sideOutputStream.print(); >> inputStream.addSink(fkp).name(record.getOutputTopic()); >> } >> } >> Here am getting the following error. >> ./bin/flink run -c firstflinkpackage.GenericStreamRouter >> ../../myrepository/flink001/target/flink001-1.0.jar >> Starting execution of program >> --------------------------- >> The program finished with the following exception: >> >> The implementation of the FilterFunction is not serializable. The object >> probably contains or references non serializable fields. >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) >> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) >> >> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >> >> org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:686) >> >> firstflinkpackage.GenericStreamRouter.setupRouterJobsFilter(GenericStreamRouter.java:118) >> firstflinkpackage.GenericStreamRouter.main(GenericStreamRouter.java:93) >> >> Looks like I should not use record.getEventType() as this is outside of the >> stream. >> >> Is there a any way to use external variable here mainly to Generalise the >> process. >> >> 2) Using the Side Output method >> >> Following code is my attempt in creating a generic way for sideoutput >> creation. >> >> I am able to create the Sink Streams based on the list >> (eventRouterRegistry). >> >> But i could not generalise the Output tag creation. >> The issue here is the output tag is fixed. >> My output tag need to be the Event Type and that needs to be in Process >> Function too. >> >> How do i implement. Should I write my own process function ? >> public static void setupRouterJobs( >> List<eventRouterRegistry> registryList, StreamExecutionEnvironment env) >> { >> >> Properties props = new Properties(); >> props.put("bootstrap.servers", BOOTSTRAP_SERVER); >> props.put("client.id <http://client.id/>", "flink-example1"); >> >> FlinkKafkaConsumer011 fkC = >> new FlinkKafkaConsumer011<>("EVENTTOPIC", new SimpleStringSchema(), >> props); >> >> DataStream<String> inputStream = env.addSource(fkC).name("EVENTTOPIC"); >> //Even if i try to generalise OUtput tag here. How do i do it inside >> ProcessFunction >> final OutputTag<String> outputTag = new >> OutputTag<String>("side-output") {}; >> SingleOutputStreamOperator<String> mainDataStream = >> inputStream.process( >> new ProcessFunction<String, String>() { >> >> @Override >> public void processElement(String value, Context ctx, >> Collector<String> out) >> throws Exception { >> // emit data to side output >> ctx.output(OutputTag, value); >> } >> }); >> >> for (eventRouterRegistry record : registryList) { >> System.out.print(record.getEventType() + " <==> " + >> record.getOutputTopic()); >> >> FlinkKafkaProducer011 fkp = >> new FlinkKafkaProducer011<>(record.getOutputTopic(), new >> SimpleStringSchema(), props); >> >> DataStream<String> sideOutputStream = >> mainDataStream.getSideOutput(outputTag); >> sideOutputStream.print(); >> sideOutputStream.addSink(fkp).name(record.getOutputTopic()); >> } >> } >> >> Thanks, >> Prasanna. >> >> >> >> On Thu, May 28, 2020 at 8:24 PM Prasanna kumar >> <prasannakumarram...@gmail.com <mailto:prasannakumarram...@gmail.com>> wrote: >> Alexander, >> >> Thanks for the reply. Will implement and come back in case of any questions. >> >> Prasanna. >> >> On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov <alexan...@ververica.com >> <mailto:alexan...@ververica.com>> wrote: >> Hi Prasanna, >> >> if the set of all possible sinks is known in advance, side outputs will be >> generic enough to express your requirements. Side output produces a stream. >> Create all of the side output tags, associate each of them with one sink, >> add conditional logic around `ctx.output(outputTag, ... );` to decide where >> to dispatch the messages (see [1]), collect to none or many side outputs, >> depending on your logic. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> >> -- >> Alexander Fedulov | Solutions Architect >> >> <image.png> >> >> >> >> >> >> >> >> >> >> >> >> >> <https://www.ververica.com/> >> Follow us @VervericaData >> -- >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference >> Stream Processing | Event Driven | Real Time >> >> >> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar >> <prasannakumarram...@gmail.com <mailto:prasannakumarram...@gmail.com>> wrote: >> Piotr, >> >> There is an event and subscriber registry as JSON file which has the table >> event mapping and event-subscriber mapping as mentioned below. >> >> Based on the set JSON , we need to job to go through the table updates and >> create events and for each event there is a way set how to sink them. >> >> The sink streams have to be added based on this JSON. Thats what i mentioned >> as no predefined sink in code earlier. >> >> You could see that each event has different set of sinks. >> >> Just checking how much generic could Side-output streams be ?. >> >> Source -> generate events -> (find out sinks dynamically in code ) -> write >> to the respective sinks. >> >> { >> " tablename ": "source.table1", >> "events": [ >> { >> "operation": "update", >> "eventstobecreated": [ >> { >> "eventname": "USERUPDATE", >> "Columnoperation": "and", >> "ColumnChanges": [ >> { >> "columnname": "name" >> }, >> { >> "columnname": "loginenabled", >> "value": "Y" >> } >> ], >> "Subscribers": [ >> { >> "customername": "c1", >> "method": "Kafka", >> "methodparams": { >> "topicname": "USERTOPIC" >> } >> }, >> { >> "customername": "c2", >> "method": "S3", >> "methodparams": { >> "folder": "aws://folderC2 <>" >> }}, ]}] >> }, >> { >> "operation": "insert", >> "eventstobecreated": [ >> "eventname": "USERINSERT", >> "operation": "insert", >> "Subscribers": [ >> { >> "teamname": "General", >> "method": "Kafka", >> "methodparams": { >> "topicname": "new_users" >> } >> }, >> { >> "teamname": "General", >> "method": "kinesis", >> "methodparams": { >> "URL": "new_users", >> "username": "uname", >> "password": "pwd" >> }}, ]}] >> }, >> { >> "operation": "delete", >> "eventstobecreated": [ >> { >> "eventname": "USERDELETE", >> "Subscribers": [ >> { >> "customername": "c1", >> "method": "Kafka", >> "methodparams": { >> "topicname": "USERTOPIC" >> } >> }, >> { >> "customername": "c4", >> "method": "Kafka", >> "methodparams": { >> "topicname": "deleterecords" >> }}, ]}] >> }, >> } >> >> Please let me know your thoughts on this. >> >> Thanks, >> Prasanna. >> >> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>> wrote: >> Hi, >> >> I’m not sure if I fully understand what do you mean by >> >> > The point is the sink are not predefined. >> >> You must know before submitting the job, what sinks are going to be used in >> the job. You can have some custom logic, that would filter out records >> before writing them to the sinks, as I proposed before, or you could use >> side outputs [1] would be better suited to your use case? >> >> Piotrek >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html >> >> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> >> >>> On 26 May 2020, at 12:20, Prasanna kumar <prasannakumarram...@gmail.com >>> <mailto:prasannakumarram...@gmail.com>> wrote: >>> >>> Thanks Piotr for the Reply. >>> >>> I will explain my requirement in detail. >>> >>> Table Updates -> Generate Business Events -> Subscribers >>> >>> Source Side >>> There are CDC of 100 tables which the framework needs to listen to. >>> >>> Event Table Mapping >>> >>> There would be Event associated with table in a m:n fashion. >>> >>> say there are tables TA, TB, TC. >>> >>> EA, EA2 and EA3 are generated from TA (based on conditions) >>> EB generated from TB (based on conditions) >>> EC generated from TC (no conditions.) >>> >>> Say there are events EA,EB,EC generated from the tables TA, TB, TC >>> >>> Event Sink Mapping >>> >>> EA has following sinks. kafka topic SA,SA2,SAC. >>> EB has following sinks. kafka topic SB , S3 sink and a rest endpoint RB. >>> EC has only rest endpoint RC. >>> >>> The point is the sink are not predefined. [. But i only see the example >>> online where , flink code having explicit myStream.addSink(sink2); ] >>> >>> We expect around 500 types of events in our platform in another 2 years >>> time. >>> >>> We are looking at writing a generic job for the same , rather than writing >>> one for new case. >>> >>> Let me know your thoughts and flink suitability to this requirement. >>> >>> Thanks >>> Prasanna. >>> >>> >>> On Tue, May 26, 2020 at 3:34 PM Piotr Nowojski <pi...@ververica.com >>> <mailto:pi...@ververica.com>> wrote: >>> Hi, >>> >>> You could easily filter/map/process the streams differently before writing >>> them to the sinks. Building on top of my previous example, this also should >>> work fine: >>> >>> >>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, >>> but any ; >>> >>> myStream.baz().addSink(sink1); >>> myStream.addSink(sink2); >>> myStream.qux().quuz().corge().addSink(sink3); >>> >>> Where foo/bar/baz/quz/quuz/corge are any stream processing functions that >>> you wish. `foo` and `bar` would be applied once to the stream, before it’s >>> going to be split to different sinks, while `baz`, `qux`, `quuz` and >>> `corge` would be applied to only of the sinks AFTER splitting. >>> >>> In your case, it could be: >>> >>> myStream.filter(...).addSink(sink1); >>> myStream.addSink(sink2); >>> myStream.addSink(sink3); >>> >>> So sink2 and sink3 would get all of the records, while sink1 only a portion >>> of them. >>> >>> Piotrek >>> >>> >>>> On 26 May 2020, at 06:45, Prasanna kumar <prasannakumarram...@gmail.com >>>> <mailto:prasannakumarram...@gmail.com>> wrote: >>>> >>>> Piotr, >>>> >>>> Thanks for the reply. >>>> >>>> There is one other case, where some events have to be written to multiple >>>> sinks and while other have to be written to just one sink. >>>> >>>> How could i have a common codeflow/DAG for the same ? >>>> >>>> I do not want multiple jobs to do the same want to accomplish in a single >>>> job . >>>> >>>> Could i add Stream code "myStream.addSink(sink1)" under a conditional >>>> operator such as 'if' to determine . >>>> >>>> But i suppose here the stream works differently compared to normal code >>>> processing. >>>> >>>> Prasanna. >>>> >>>> >>>> On Mon 25 May, 2020, 23:37 Piotr Nowojski, <pi...@ververica.com >>>> <mailto:pi...@ververica.com>> wrote: >>>> Hi, >>>> >>>> To the best of my knowledge the following pattern should work just fine: >>>> >>>> DataStream myStream = env.addSource(…).foo().bar() // for custom source, >>>> but any ; >>>> myStream.addSink(sink1); >>>> myStream.addSink(sink2); >>>> myStream.addSink(sink3); >>>> >>>> All of the records from `myStream` would be passed to each of the sinks. >>>> >>>> Piotrek >>>> >>>> > On 24 May 2020, at 19:34, Prasanna kumar <prasannakumarram...@gmail.com >>>> > <mailto:prasannakumarram...@gmail.com>> wrote: >>>> > >>>> > Hi, >>>> > >>>> > There is a single source of events for me in my system. >>>> > >>>> > I need to process and send the events to multiple destination/sink at >>>> > the same time.[ kafka topic, s3, kinesis and POST to HTTP endpoint ] >>>> > >>>> > I am able send to one sink. >>>> > >>>> > By adding more sink stream to the source stream could we achieve it . >>>> > Are there any shortcomings. >>>> > >>>> > Please let me know if any one here has successfully implemented one . >>>> > >>>> > Thanks, >>>> > Prasanna. >