Thanks, Arvid. I will try to implement using the broadcast approach. On Fri, Jun 26, 2020 at 1:10 AM Arvid Heise <ar...@ververica.com> wrote:
> Hi Anuj, > > Yes, broadcast sounds really good. Now you just need to hide the > structural invariance (variable number of sinks) by delegating to inner > sinks. > > public class SplittingStreamingFileSink<IN> > extends RichSinkFunction<IN> > implements CheckpointedFunction, CheckpointListener { > > Map<String, StreamingFileSink> sinks = ...; // event name to sink > > // delegate each method of RichSinkFunction, CheckpointedFunction, > CheckpointListener to sinks > > public void invoke(IN value, SinkFunction.Context context) throws > Exception { > StreamingFileSink sink = sinks.get(value.getEventName()); > > if (sink == null) { > > // create new StreamingFileSink add to sinks and call > initializeState > > } > > sink.invoke(value, context); > > } > > } > > Use the SplittingStreamingFileSink as the only sink in your workflow. You > definitely need to wrap the SinkFunction.Context such that state gets > prefixes (eventName), but it should be rather straightforward. > > On Thu, Jun 25, 2020 at 3:47 PM aj <ajainje...@gmail.com> wrote: > >> Thanks, Arvide for detailed answers. >> - Have some kind submitter that restarts flink automatically on config >> change (assumes that restart time is not the issue). >> Yes, that can be written but that not solve the problem completely >> because I want to avoid job restart itself. Every time I restart I also >> have to restart from the last checkpoint in S3. >> >> - Your job could periodically check for a change and then fail. However, >> you need to fail in a way that the topology is rebuilt. I guess it's close >> to a fatal error and then the driver handles that error and restart. >> Again same reason as first also not sure how would it be implemented. >> >> Rewrite the job in such a way that you have only one sink in your job >> graph which demultiplexes the event to several internal sinks. Then you >> could simply add a new sink whenever a new event occurs. >> -Please provide me some idea I want to implement this, how it can be >> possible. >> >> @Arvid Heise <ar...@ververica.com> jJuujIstdfdf >> I am just thinking can I use a broadcast state where this config rule >> which I keeping in YAML can be a push in Kafka itself. Because I just need >> event name and Avro schema subject mapping mainly. >> Please correct me if I am thinking in the wrong direction. >> >> >> >> On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: >> >>> Hi Arvid, >>> >>> Would it be possible to implement a BucketAssigner that for example >>> loads the configuration periodically from an external source and according >>> to the event type decides on a different sub-folder? >>> >>> Thanks, >>> Rafi >>> >>> >>> On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <ar...@ververica.com> >>> wrote: >>> >>>> Hi Anuj, >>>> >>>> There is currently no way to dynamically change the topology. It would >>>> be good to know why your current approach is not working (restart taking >>>> too long? Too frequent changes?) >>>> >>>> So some ideas: >>>> - Have some kind submitter that restarts flink automatically on config >>>> change (assumes that restart time is not the issue). >>>> - Your job could periodically check for a change and then fail. >>>> However, you need to fail in a way that the topology is rebuilt. I guess >>>> it's close to a fatal error and then the driver handles that error and >>>> restart. >>>> - Rewrite the job in such a way that you have only one sink in your job >>>> graph which demultiplexes the event to several internal sinks. Then you >>>> could simply add a new sink whenever a new event occurs. >>>> >>>> The first option is the easiest and the last option the most versatile >>>> (could even have different sink types mixed). >>>> >>>> On Tue, Jun 23, 2020 at 5:34 AM aj <ajainje...@gmail.com> wrote: >>>> >>>>> I am stuck on this . Please give some suggestions. >>>>> >>>>> On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote: >>>>> >>>>>> please help with this. Any suggestions. >>>>>> >>>>>> On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote: >>>>>> >>>>>>> Hello All, >>>>>>> >>>>>>> I am receiving a set of events in Avro format on different topics. I >>>>>>> want to consume these and write to s3 in parquet format. >>>>>>> I have written a below job that creates a different stream for each >>>>>>> event and fetches it schema from the confluent schema registry to >>>>>>> create a >>>>>>> parquet sink for an event. >>>>>>> This is working fine but the only problem I am facing is whenever a >>>>>>> new event start coming I have to change in the YAML config and restart >>>>>>> the >>>>>>> job every time. Is there any way I do not have to restart the job and it >>>>>>> start consuming a new set of events. >>>>>>> >>>>>>> >>>>>>> YAML config : >>>>>>> >>>>>>> !com.bounce.config.EventTopologyConfig >>>>>>> eventsType: >>>>>>> - !com.bounce.config.EventConfig >>>>>>> event_name: "search_list_keyless" >>>>>>> schema_subject: >>>>>>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless" >>>>>>> topic: "search_list_keyless" >>>>>>> >>>>>>> - !com.bounce.config.EventConfig >>>>>>> event_name: "bike_search_details" >>>>>>> schema_subject: >>>>>>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details" >>>>>>> topic: "bike_search_details" >>>>>>> >>>>>>> - !com.bounce.config.EventConfig >>>>>>> event_name: "keyless_bike_lock" >>>>>>> schema_subject: >>>>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock" >>>>>>> topic: "analytics-keyless" >>>>>>> >>>>>>> - !com.bounce.config.EventConfig >>>>>>> event_name: "keyless_bike_unlock" >>>>>>> schema_subject: >>>>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock" >>>>>>> topic: "analytics-keyless" >>>>>>> >>>>>>> >>>>>>> checkPointInterval: 1200000 >>>>>>> >>>>>>> topics: >>>>>>> ["search_list_keyless","bike_search_details","analytics-keyless"] >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Sink code :* >>>>>>> >>>>>>> YamlReader reader = new YamlReader(topologyConfig); >>>>>>> EventTopologyConfig eventTopologyConfig = >>>>>>> reader.read(EventTopologyConfig.class); >>>>>>> >>>>>>> long checkPointInterval = >>>>>>> eventTopologyConfig.getCheckPointInterval(); >>>>>>> topics = eventTopologyConfig.getTopics(); >>>>>>> >>>>>>> List<EventConfig> eventTypesList = >>>>>>> eventTopologyConfig.getEventsType(); >>>>>>> >>>>>>> CachedSchemaRegistryClient registryClient = new >>>>>>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000); >>>>>>> >>>>>>> >>>>>>> FlinkKafkaConsumer flinkKafkaConsumer = new >>>>>>> FlinkKafkaConsumer(topics, >>>>>>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>> properties); >>>>>>> >>>>>>> DataStream<GenericRecord> dataStream = >>>>>>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); >>>>>>> >>>>>>> try { >>>>>>> for (EventConfig eventConfig : eventTypesList) { >>>>>>> >>>>>>> LOG.info("creating a stream for ", eventConfig.getEvent_name()); >>>>>>> >>>>>>> final StreamingFileSink<GenericRecord> sink = >>>>>>> StreamingFileSink.forBulkFormat >>>>>>> (path, >>>>>>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), >>>>>>> registryClient))) >>>>>>> .withBucketAssigner(new EventTimeBucketAssigner()) >>>>>>> .build(); >>>>>>> >>>>>>> DataStream<GenericRecord> outStream = >>>>>>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { >>>>>>> if (genericRecord != null && >>>>>>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) >>>>>>> { >>>>>>> return true; >>>>>>> } >>>>>>> return false; >>>>>>> }); >>>>>>> >>>>>>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); >>>>>>> >>>>>>> } >>>>>>> } catch (Exception e) { >>>>>>> e.printStackTrace(); >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks & Regards, >>>>>>> Anuj Jain >>>>>>> >>>>>>> >>>>>>> >>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards, >>>>>> Anuj Jain >>>>>> Mob. : +91- 8588817877 >>>>>> Skype : anuj.jain07 >>>>>> <http://www.oracle.com/> >>>>>> >>>>>> >>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>> >>>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>