Thanks, Arvid. I will try to implement using the broadcast approach.

On Fri, Jun 26, 2020 at 1:10 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> Yes, broadcast sounds really good. Now you just need to hide the
> structural invariance (variable number of sinks) by delegating to inner
> sinks.
>
> public class SplittingStreamingFileSink<IN>
>       extends RichSinkFunction<IN>
>       implements CheckpointedFunction, CheckpointListener {
>
>       Map<String, StreamingFileSink> sinks = ...; // event name to sink
>
>       // delegate each method of RichSinkFunction, CheckpointedFunction, 
> CheckpointListener to sinks
>
>       public void invoke(IN value, SinkFunction.Context context) throws 
> Exception {
>             StreamingFileSink sink = sinks.get(value.getEventName());
>
>             if (sink == null) {
>
>                   // create new StreamingFileSink add to sinks and call 
> initializeState
>
>             }
>
>             sink.invoke(value, context);
>
>       }
>
> }
>
> Use the SplittingStreamingFileSink as the only sink in your workflow. You
> definitely need to wrap the SinkFunction.Context such that state gets
> prefixes (eventName), but it should be rather straightforward.
>
> On Thu, Jun 25, 2020 at 3:47 PM aj <ajainje...@gmail.com> wrote:
>
>> Thanks, Arvide for detailed answers.
>> - Have some kind submitter that restarts flink automatically on config
>> change (assumes that restart time is not the issue).
>>   Yes, that can be written but that not solve the problem completely
>> because I want to avoid job restart itself. Every time I restart I also
>> have to restart from the last checkpoint in S3.
>>
>> - Your job could periodically check for a change and then fail. However,
>> you need to fail in a way that the topology is rebuilt. I guess it's close
>> to a fatal error and then the driver handles that error and restart.
>>    Again same reason as first also not sure how would it be implemented.
>>
>> Rewrite the job in such a way that you have only one sink in your job
>> graph which demultiplexes the event to several internal sinks. Then you
>> could simply add a new sink whenever a new event occurs.
>> -Please provide me some idea I want to implement this, how it can be
>> possible.
>>
>> @Arvid Heise <ar...@ververica.com>  jJuujIstdfdf
>> I am just thinking can I use a broadcast state where this config rule
>> which I keeping in YAML can be a push in Kafka itself.  Because I just need
>> event name and  Avro schema subject mapping mainly.
>> Please correct me if I am thinking in the wrong direction.
>>
>>
>>
>> On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> Would it be possible to implement a BucketAssigner that for example
>>> loads the configuration periodically from an external source and according
>>> to the event type decides on a different sub-folder?
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> There is currently no way to dynamically change the topology. It would
>>>> be good to know why your current approach is not working (restart taking
>>>> too long? Too frequent changes?)
>>>>
>>>> So some ideas:
>>>> - Have some kind submitter that restarts flink automatically on config
>>>> change (assumes that restart time is not the issue).
>>>> - Your job could periodically check for a change and then fail.
>>>> However, you need to fail in a way that the topology is rebuilt. I guess
>>>> it's close to a fatal error and then the driver handles that error and
>>>> restart.
>>>> - Rewrite the job in such a way that you have only one sink in your job
>>>> graph which demultiplexes the event to several internal sinks. Then you
>>>> could simply add a new sink whenever a new event occurs.
>>>>
>>>> The first option is the easiest and the last option the most versatile
>>>> (could even have different sink types mixed).
>>>>
>>>> On Tue, Jun 23, 2020 at 5:34 AM aj <ajainje...@gmail.com> wrote:
>>>>
>>>>> I am stuck on this . Please give some suggestions.
>>>>>
>>>>> On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote:
>>>>>
>>>>>> please help with this. Any suggestions.
>>>>>>
>>>>>> On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello All,
>>>>>>>
>>>>>>> I am receiving a set of events in Avro format on different topics. I
>>>>>>> want to consume these and write to s3 in parquet format.
>>>>>>> I have written a below job that creates a different stream for each
>>>>>>> event and fetches it schema from the confluent schema registry to 
>>>>>>> create a
>>>>>>> parquet sink for an event.
>>>>>>> This is working fine but the only problem I am facing is whenever a
>>>>>>> new event start coming I have to change in the YAML config and restart 
>>>>>>> the
>>>>>>> job every time. Is there any way I do not have to restart the job and it
>>>>>>> start consuming a new set of events.
>>>>>>>
>>>>>>>
>>>>>>> YAML config :
>>>>>>>
>>>>>>> !com.bounce.config.EventTopologyConfig
>>>>>>> eventsType:
>>>>>>>   - !com.bounce.config.EventConfig
>>>>>>>     event_name: "search_list_keyless"
>>>>>>>     schema_subject: 
>>>>>>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>>>>>>>     topic: "search_list_keyless"
>>>>>>>
>>>>>>>   - !com.bounce.config.EventConfig
>>>>>>>     event_name: "bike_search_details"
>>>>>>>     schema_subject: 
>>>>>>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>>>>>>>     topic: "bike_search_details"
>>>>>>>
>>>>>>>   - !com.bounce.config.EventConfig
>>>>>>>     event_name: "keyless_bike_lock"
>>>>>>>     schema_subject: 
>>>>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>>>>>>>     topic: "analytics-keyless"
>>>>>>>
>>>>>>>   - !com.bounce.config.EventConfig
>>>>>>>       event_name: "keyless_bike_unlock"
>>>>>>>       schema_subject: 
>>>>>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>>>>>>>       topic: "analytics-keyless"
>>>>>>>
>>>>>>>
>>>>>>> checkPointInterval: 1200000
>>>>>>>
>>>>>>> topics: 
>>>>>>> ["search_list_keyless","bike_search_details","analytics-keyless"]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Sink code :*
>>>>>>>
>>>>>>>   YamlReader reader = new YamlReader(topologyConfig);
>>>>>>>     EventTopologyConfig eventTopologyConfig = 
>>>>>>> reader.read(EventTopologyConfig.class);
>>>>>>>
>>>>>>>     long checkPointInterval = 
>>>>>>> eventTopologyConfig.getCheckPointInterval();
>>>>>>>         topics = eventTopologyConfig.getTopics();
>>>>>>>
>>>>>>>                 List<EventConfig> eventTypesList = 
>>>>>>> eventTopologyConfig.getEventsType();
>>>>>>>
>>>>>>>         CachedSchemaRegistryClient registryClient = new 
>>>>>>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>>>>>>>
>>>>>>>
>>>>>>>         FlinkKafkaConsumer flinkKafkaConsumer = new 
>>>>>>> FlinkKafkaConsumer(topics,
>>>>>>>         new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>         properties);
>>>>>>>
>>>>>>>         DataStream<GenericRecord> dataStream = 
>>>>>>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>>>>>>>
>>>>>>>         try {
>>>>>>>         for (EventConfig eventConfig : eventTypesList) {
>>>>>>>
>>>>>>>         LOG.info("creating a stream for ", eventConfig.getEvent_name());
>>>>>>>
>>>>>>> final StreamingFileSink<GenericRecord> sink = 
>>>>>>> StreamingFileSink.forBulkFormat
>>>>>>>         (path, 
>>>>>>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>>>>>>>  registryClient)))
>>>>>>>         .withBucketAssigner(new EventTimeBucketAssigner())
>>>>>>>         .build();
>>>>>>>
>>>>>>>         DataStream<GenericRecord> outStream = 
>>>>>>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>>>>>>>         if (genericRecord != null && 
>>>>>>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>>>>>>  {
>>>>>>>         return true;
>>>>>>>         }
>>>>>>>         return false;
>>>>>>>         });
>>>>>>>         
>>>>>>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>>>>>>>
>>>>>>>         }
>>>>>>>         } catch (Exception e) {
>>>>>>>         e.printStackTrace();
>>>>>>>         }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks & Regards,
>>>>>>> Anuj Jain
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>> Mob. : +91- 8588817877
>>>>>> Skype : anuj.jain07
>>>>>> <http://www.oracle.com/>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to