Re: Flink Stream job to parquet sink

2020-06-30 Thread aj
Thanks, Arvid. I will try to implement using the broadcast approach.

On Fri, Jun 26, 2020 at 1:10 AM Arvid Heise  wrote:

> Hi Anuj,
>
> Yes, broadcast sounds really good. Now you just need to hide the
> structural invariance (variable number of sinks) by delegating to inner
> sinks.
>
> public class SplittingStreamingFileSink
>   extends RichSinkFunction
>   implements CheckpointedFunction, CheckpointListener {
>
>   Map sinks = ...; // event name to sink
>
>   // delegate each method of RichSinkFunction, CheckpointedFunction, 
> CheckpointListener to sinks
>
>   public void invoke(IN value, SinkFunction.Context context) throws 
> Exception {
> StreamingFileSink sink = sinks.get(value.getEventName());
>
> if (sink == null) {
>
>   // create new StreamingFileSink add to sinks and call 
> initializeState
>
> }
>
> sink.invoke(value, context);
>
>   }
>
> }
>
> Use the SplittingStreamingFileSink as the only sink in your workflow. You
> definitely need to wrap the SinkFunction.Context such that state gets
> prefixes (eventName), but it should be rather straightforward.
>
> On Thu, Jun 25, 2020 at 3:47 PM aj  wrote:
>
>> Thanks, Arvide for detailed answers.
>> - Have some kind submitter that restarts flink automatically on config
>> change (assumes that restart time is not the issue).
>>   Yes, that can be written but that not solve the problem completely
>> because I want to avoid job restart itself. Every time I restart I also
>> have to restart from the last checkpoint in S3.
>>
>> - Your job could periodically check for a change and then fail. However,
>> you need to fail in a way that the topology is rebuilt. I guess it's close
>> to a fatal error and then the driver handles that error and restart.
>>Again same reason as first also not sure how would it be implemented.
>>
>> Rewrite the job in such a way that you have only one sink in your job
>> graph which demultiplexes the event to several internal sinks. Then you
>> could simply add a new sink whenever a new event occurs.
>> -Please provide me some idea I want to implement this, how it can be
>> possible.
>>
>> @Arvid Heise   jJuujIstdfdf
>> I am just thinking can I use a broadcast state where this config rule
>> which I keeping in YAML can be a push in Kafka itself.  Because I just need
>> event name and  Avro schema subject mapping mainly.
>> Please correct me if I am thinking in the wrong direction.
>>
>>
>>
>> On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch  wrote:
>>
>>> Hi Arvid,
>>>
>>> Would it be possible to implement a BucketAssigner that for example
>>> loads the configuration periodically from an external source and according
>>> to the event type decides on a different sub-folder?
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise 
>>> wrote:
>>>
 Hi Anuj,

 There is currently no way to dynamically change the topology. It would
 be good to know why your current approach is not working (restart taking
 too long? Too frequent changes?)

 So some ideas:
 - Have some kind submitter that restarts flink automatically on config
 change (assumes that restart time is not the issue).
 - Your job could periodically check for a change and then fail.
 However, you need to fail in a way that the topology is rebuilt. I guess
 it's close to a fatal error and then the driver handles that error and
 restart.
 - Rewrite the job in such a way that you have only one sink in your job
 graph which demultiplexes the event to several internal sinks. Then you
 could simply add a new sink whenever a new event occurs.

 The first option is the easiest and the last option the most versatile
 (could even have different sink types mixed).

 On Tue, Jun 23, 2020 at 5:34 AM aj  wrote:

> I am stuck on this . Please give some suggestions.
>
> On Tue, Jun 9, 2020, 21:40 aj  wrote:
>
>> please help with this. Any suggestions.
>>
>> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>>
>>> Hello All,
>>>
>>> I am receiving a set of events in Avro format on different topics. I
>>> want to consume these and write to s3 in parquet format.
>>> I have written a below job that creates a different stream for each
>>> event and fetches it schema from the confluent schema registry to 
>>> create a
>>> parquet sink for an event.
>>> This is working fine but the only problem I am facing is whenever a
>>> new event start coming I have to change in the YAML config and restart 
>>> the
>>> job every time. Is there any way I do not have to restart the job and it
>>> start consuming a new set of events.
>>>
>>>
>>> YAML config :
>>>
>>> !com.bounce.config.EventTopologyConfig
>>> eventsType:
>>>   - !com.bounce.config.EventConfig
>>> event_name: "search_list_keyless"
>>>

Re: Flink Stream job to parquet sink

2020-06-25 Thread aj
Thanks, Arvide for detailed answers.
- Have some kind submitter that restarts flink automatically on config
change (assumes that restart time is not the issue).
  Yes, that can be written but that not solve the problem completely
because I want to avoid job restart itself. Every time I restart I also
have to restart from the last checkpoint in S3.

- Your job could periodically check for a change and then fail. However,
you need to fail in a way that the topology is rebuilt. I guess it's close
to a fatal error and then the driver handles that error and restart.
   Again same reason as first also not sure how would it be implemented.

Rewrite the job in such a way that you have only one sink in your job graph
which demultiplexes the event to several internal sinks. Then you could
simply add a new sink whenever a new event occurs.
-Please provide me some idea I want to implement this, how it can be
possible.

@Arvid Heise   jJuujIstdfdf
I am just thinking can I use a broadcast state where this config rule which
I keeping in YAML can be a push in Kafka itself.  Because I just need event
name and  Avro schema subject mapping mainly.
Please correct me if I am thinking in the wrong direction.



On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch  wrote:

> Hi Arvid,
>
> Would it be possible to implement a BucketAssigner that for example loads
> the configuration periodically from an external source and according to the
> event type decides on a different sub-folder?
>
> Thanks,
> Rafi
>
>
> On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise  wrote:
>
>> Hi Anuj,
>>
>> There is currently no way to dynamically change the topology. It would be
>> good to know why your current approach is not working (restart taking too
>> long? Too frequent changes?)
>>
>> So some ideas:
>> - Have some kind submitter that restarts flink automatically on config
>> change (assumes that restart time is not the issue).
>> - Your job could periodically check for a change and then fail. However,
>> you need to fail in a way that the topology is rebuilt. I guess it's close
>> to a fatal error and then the driver handles that error and restart.
>> - Rewrite the job in such a way that you have only one sink in your job
>> graph which demultiplexes the event to several internal sinks. Then you
>> could simply add a new sink whenever a new event occurs.
>>
>> The first option is the easiest and the last option the most versatile
>> (could even have different sink types mixed).
>>
>> On Tue, Jun 23, 2020 at 5:34 AM aj  wrote:
>>
>>> I am stuck on this . Please give some suggestions.
>>>
>>> On Tue, Jun 9, 2020, 21:40 aj  wrote:
>>>
 please help with this. Any suggestions.

 On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:

> Hello All,
>
> I am receiving a set of events in Avro format on different topics. I
> want to consume these and write to s3 in parquet format.
> I have written a below job that creates a different stream for each
> event and fetches it schema from the confluent schema registry to create a
> parquet sink for an event.
> This is working fine but the only problem I am facing is whenever a
> new event start coming I have to change in the YAML config and restart the
> job every time. Is there any way I do not have to restart the job and it
> start consuming a new set of events.
>
>
> YAML config :
>
> !com.bounce.config.EventTopologyConfig
> eventsType:
>   - !com.bounce.config.EventConfig
> event_name: "search_list_keyless"
> schema_subject: 
> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
> topic: "search_list_keyless"
>
>   - !com.bounce.config.EventConfig
> event_name: "bike_search_details"
> schema_subject: 
> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
> topic: "bike_search_details"
>
>   - !com.bounce.config.EventConfig
> event_name: "keyless_bike_lock"
> schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
> topic: "analytics-keyless"
>
>   - !com.bounce.config.EventConfig
>   event_name: "keyless_bike_unlock"
>   schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>   topic: "analytics-keyless"
>
>
> checkPointInterval: 120
>
> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>
>
>
>
>
> *Sink code :*
>
>   YamlReader reader = new YamlReader(topologyConfig);
> EventTopologyConfig eventTopologyConfig = 
> reader.read(EventTopologyConfig.class);
>
> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
> topics = eventTopologyConfig.getTopics();
>
> List eventTypesList = 
> 

Re: Flink Stream job to parquet sink

2020-06-25 Thread Rafi Aroch
Hi Arvid,

Would it be possible to implement a BucketAssigner that for example loads
the configuration periodically from an external source and according to the
event type decides on a different sub-folder?

Thanks,
Rafi


On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise  wrote:

> Hi Anuj,
>
> There is currently no way to dynamically change the topology. It would be
> good to know why your current approach is not working (restart taking too
> long? Too frequent changes?)
>
> So some ideas:
> - Have some kind submitter that restarts flink automatically on config
> change (assumes that restart time is not the issue).
> - Your job could periodically check for a change and then fail. However,
> you need to fail in a way that the topology is rebuilt. I guess it's close
> to a fatal error and then the driver handles that error and restart.
> - Rewrite the job in such a way that you have only one sink in your job
> graph which demultiplexes the event to several internal sinks. Then you
> could simply add a new sink whenever a new event occurs.
>
> The first option is the easiest and the last option the most versatile
> (could even have different sink types mixed).
>
> On Tue, Jun 23, 2020 at 5:34 AM aj  wrote:
>
>> I am stuck on this . Please give some suggestions.
>>
>> On Tue, Jun 9, 2020, 21:40 aj  wrote:
>>
>>> please help with this. Any suggestions.
>>>
>>> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>>>
 Hello All,

 I am receiving a set of events in Avro format on different topics. I
 want to consume these and write to s3 in parquet format.
 I have written a below job that creates a different stream for each
 event and fetches it schema from the confluent schema registry to create a
 parquet sink for an event.
 This is working fine but the only problem I am facing is whenever a new
 event start coming I have to change in the YAML config and restart the job
 every time. Is there any way I do not have to restart the job and it start
 consuming a new set of events.


 YAML config :

 !com.bounce.config.EventTopologyConfig
 eventsType:
   - !com.bounce.config.EventConfig
 event_name: "search_list_keyless"
 schema_subject: 
 "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
 topic: "search_list_keyless"

   - !com.bounce.config.EventConfig
 event_name: "bike_search_details"
 schema_subject: 
 "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
 topic: "bike_search_details"

   - !com.bounce.config.EventConfig
 event_name: "keyless_bike_lock"
 schema_subject: 
 "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
 topic: "analytics-keyless"

   - !com.bounce.config.EventConfig
   event_name: "keyless_bike_unlock"
   schema_subject: 
 "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
   topic: "analytics-keyless"


 checkPointInterval: 120

 topics: ["search_list_keyless","bike_search_details","analytics-keyless"]





 *Sink code :*

   YamlReader reader = new YamlReader(topologyConfig);
 EventTopologyConfig eventTopologyConfig = 
 reader.read(EventTopologyConfig.class);

 long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
 topics = eventTopologyConfig.getTopics();

 List eventTypesList = 
 eventTopologyConfig.getEventsType();

 CachedSchemaRegistryClient registryClient = new 
 CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


 FlinkKafkaConsumer flinkKafkaConsumer = new 
 FlinkKafkaConsumer(topics,
 new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
 properties);

 DataStream dataStream = 
 streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

 try {
 for (EventConfig eventConfig : eventTypesList) {

 LOG.info("creating a stream for ", eventConfig.getEvent_name());

 final StreamingFileSink sink = 
 StreamingFileSink.forBulkFormat
 (path, 
 ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
  registryClient)))
 .withBucketAssigner(new EventTimeBucketAssigner())
 .build();

 DataStream outStream = 
 dataStream.filter((FilterFunction) genericRecord -> {
 if (genericRecord != null && 
 genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
  {
 return true;
 }
 return false;
 });
 
 outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

 }

Re: Flink Stream job to parquet sink

2020-06-24 Thread Arvid Heise
Hi Anuj,

There is currently no way to dynamically change the topology. It would be
good to know why your current approach is not working (restart taking too
long? Too frequent changes?)

So some ideas:
- Have some kind submitter that restarts flink automatically on config
change (assumes that restart time is not the issue).
- Your job could periodically check for a change and then fail. However,
you need to fail in a way that the topology is rebuilt. I guess it's close
to a fatal error and then the driver handles that error and restart.
- Rewrite the job in such a way that you have only one sink in your job
graph which demultiplexes the event to several internal sinks. Then you
could simply add a new sink whenever a new event occurs.

The first option is the easiest and the last option the most versatile
(could even have different sink types mixed).

On Tue, Jun 23, 2020 at 5:34 AM aj  wrote:

> I am stuck on this . Please give some suggestions.
>
> On Tue, Jun 9, 2020, 21:40 aj  wrote:
>
>> please help with this. Any suggestions.
>>
>> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>>
>>> Hello All,
>>>
>>> I am receiving a set of events in Avro format on different topics. I
>>> want to consume these and write to s3 in parquet format.
>>> I have written a below job that creates a different stream for each
>>> event and fetches it schema from the confluent schema registry to create a
>>> parquet sink for an event.
>>> This is working fine but the only problem I am facing is whenever a new
>>> event start coming I have to change in the YAML config and restart the job
>>> every time. Is there any way I do not have to restart the job and it start
>>> consuming a new set of events.
>>>
>>>
>>> YAML config :
>>>
>>> !com.bounce.config.EventTopologyConfig
>>> eventsType:
>>>   - !com.bounce.config.EventConfig
>>> event_name: "search_list_keyless"
>>> schema_subject: 
>>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>>> topic: "search_list_keyless"
>>>
>>>   - !com.bounce.config.EventConfig
>>> event_name: "bike_search_details"
>>> schema_subject: 
>>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>>> topic: "bike_search_details"
>>>
>>>   - !com.bounce.config.EventConfig
>>> event_name: "keyless_bike_lock"
>>> schema_subject: 
>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>>> topic: "analytics-keyless"
>>>
>>>   - !com.bounce.config.EventConfig
>>>   event_name: "keyless_bike_unlock"
>>>   schema_subject: 
>>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>>>   topic: "analytics-keyless"
>>>
>>>
>>> checkPointInterval: 120
>>>
>>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>>>
>>>
>>>
>>>
>>>
>>> *Sink code :*
>>>
>>>   YamlReader reader = new YamlReader(topologyConfig);
>>> EventTopologyConfig eventTopologyConfig = 
>>> reader.read(EventTopologyConfig.class);
>>>
>>> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
>>> topics = eventTopologyConfig.getTopics();
>>>
>>> List eventTypesList = 
>>> eventTopologyConfig.getEventsType();
>>>
>>> CachedSchemaRegistryClient registryClient = new 
>>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>>>
>>>
>>> FlinkKafkaConsumer flinkKafkaConsumer = new 
>>> FlinkKafkaConsumer(topics,
>>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>> properties);
>>>
>>> DataStream dataStream = 
>>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>>>
>>> try {
>>> for (EventConfig eventConfig : eventTypesList) {
>>>
>>> LOG.info("creating a stream for ", eventConfig.getEvent_name());
>>>
>>> final StreamingFileSink sink = 
>>> StreamingFileSink.forBulkFormat
>>> (path, 
>>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>>>  registryClient)))
>>> .withBucketAssigner(new EventTimeBucketAssigner())
>>> .build();
>>>
>>> DataStream outStream = 
>>> dataStream.filter((FilterFunction) genericRecord -> {
>>> if (genericRecord != null && 
>>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>>  {
>>> return true;
>>> }
>>> return false;
>>> });
>>> 
>>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>>>
>>> }
>>> } catch (Exception e) {
>>> e.printStackTrace();
>>> }
>>>
>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>>
>>>
>>>
>>> 
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> 
>>
>>
>> 
>>
>

-- 

Arvid Heise | Senior Java Developer

Re: Flink Stream job to parquet sink

2020-06-22 Thread aj
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj  wrote:

> please help with this. Any suggestions.
>
> On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:
>
>> Hello All,
>>
>> I am receiving a set of events in Avro format on different topics. I want
>> to consume these and write to s3 in parquet format.
>> I have written a below job that creates a different stream for each event
>> and fetches it schema from the confluent schema registry to create a
>> parquet sink for an event.
>> This is working fine but the only problem I am facing is whenever a new
>> event start coming I have to change in the YAML config and restart the job
>> every time. Is there any way I do not have to restart the job and it start
>> consuming a new set of events.
>>
>>
>> YAML config :
>>
>> !com.bounce.config.EventTopologyConfig
>> eventsType:
>>   - !com.bounce.config.EventConfig
>> event_name: "search_list_keyless"
>> schema_subject: 
>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>> topic: "search_list_keyless"
>>
>>   - !com.bounce.config.EventConfig
>> event_name: "bike_search_details"
>> schema_subject: 
>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>> topic: "bike_search_details"
>>
>>   - !com.bounce.config.EventConfig
>> event_name: "keyless_bike_lock"
>> schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>> topic: "analytics-keyless"
>>
>>   - !com.bounce.config.EventConfig
>>   event_name: "keyless_bike_unlock"
>>   schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>>   topic: "analytics-keyless"
>>
>>
>> checkPointInterval: 120
>>
>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>>
>>
>>
>>
>>
>> *Sink code :*
>>
>>   YamlReader reader = new YamlReader(topologyConfig);
>> EventTopologyConfig eventTopologyConfig = 
>> reader.read(EventTopologyConfig.class);
>>
>> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
>> topics = eventTopologyConfig.getTopics();
>>
>> List eventTypesList = 
>> eventTopologyConfig.getEventsType();
>>
>> CachedSchemaRegistryClient registryClient = new 
>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>>
>>
>> FlinkKafkaConsumer flinkKafkaConsumer = new 
>> FlinkKafkaConsumer(topics,
>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>> properties);
>>
>> DataStream dataStream = 
>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>>
>> try {
>> for (EventConfig eventConfig : eventTypesList) {
>>
>> LOG.info("creating a stream for ", eventConfig.getEvent_name());
>>
>> final StreamingFileSink sink = StreamingFileSink.forBulkFormat
>> (path, 
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>>  registryClient)))
>> .withBucketAssigner(new EventTimeBucketAssigner())
>> .build();
>>
>> DataStream outStream = 
>> dataStream.filter((FilterFunction) genericRecord -> {
>> if (genericRecord != null && 
>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>  {
>> return true;
>> }
>> return false;
>> });
>> 
>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>>
>> }
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> 
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> 
>
>
> 
>


Re: Flink Stream job to parquet sink

2020-06-09 Thread aj
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:

> Hello All,
>
> I am receiving a set of events in Avro format on different topics. I want
> to consume these and write to s3 in parquet format.
> I have written a below job that creates a different stream for each event
> and fetches it schema from the confluent schema registry to create a
> parquet sink for an event.
> This is working fine but the only problem I am facing is whenever a new
> event start coming I have to change in the YAML config and restart the job
> every time. Is there any way I do not have to restart the job and it start
> consuming a new set of events.
>
>
> YAML config :
>
> !com.bounce.config.EventTopologyConfig
> eventsType:
>   - !com.bounce.config.EventConfig
> event_name: "search_list_keyless"
> schema_subject: 
> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
> topic: "search_list_keyless"
>
>   - !com.bounce.config.EventConfig
> event_name: "bike_search_details"
> schema_subject: 
> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
> topic: "bike_search_details"
>
>   - !com.bounce.config.EventConfig
> event_name: "keyless_bike_lock"
> schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
> topic: "analytics-keyless"
>
>   - !com.bounce.config.EventConfig
>   event_name: "keyless_bike_unlock"
>   schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>   topic: "analytics-keyless"
>
>
> checkPointInterval: 120
>
> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>
>
>
>
>
> *Sink code :*
>
>   YamlReader reader = new YamlReader(topologyConfig);
> EventTopologyConfig eventTopologyConfig = 
> reader.read(EventTopologyConfig.class);
>
> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
> topics = eventTopologyConfig.getTopics();
>
> List eventTypesList = 
> eventTopologyConfig.getEventsType();
>
> CachedSchemaRegistryClient registryClient = new 
> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>
>
> FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> DataStream dataStream = 
> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>
> try {
> for (EventConfig eventConfig : eventTypesList) {
>
> LOG.info("creating a stream for ", eventConfig.getEvent_name());
>
> final StreamingFileSink sink = StreamingFileSink.forBulkFormat
> (path, 
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>  registryClient)))
> .withBucketAssigner(new EventTimeBucketAssigner())
> .build();
>
> DataStream outStream = 
> dataStream.filter((FilterFunction) genericRecord -> {
> if (genericRecord != null && 
> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) 
> {
> return true;
> }
> return false;
> });
> 
> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
>
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Flink Stream job to parquet sink

2020-06-06 Thread aj
Hello All,

I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a below job that creates a different stream for each event
and fetches it schema from the confluent schema registry to create a
parquet sink for an event.
This is working fine but the only problem I am facing is whenever a new
event start coming I have to change in the YAML config and restart the job
every time. Is there any way I do not have to restart the job and it start
consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
  - !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject:
"search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

  - !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject:
"bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

  - !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

  - !com.bounce.config.EventConfig
  event_name: "keyless_bike_unlock"
  schema_subject:
"analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
  topic: "analytics-keyless"


checkPointInterval: 120

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]





*Sink code :*

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig =
reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List eventTypesList =
eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new
CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream dataStream =
streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path, 
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream outStream =
dataStream.filter((FilterFunction) genericRecord -> {
if (genericRecord != null &&
genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});

outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}




-- 
Thanks & Regards,
Anuj Jain