please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote:

> Hello All,
>
> I am receiving a set of events in Avro format on different topics. I want
> to consume these and write to s3 in parquet format.
> I have written a below job that creates a different stream for each event
> and fetches it schema from the confluent schema registry to create a
> parquet sink for an event.
> This is working fine but the only problem I am facing is whenever a new
> event start coming I have to change in the YAML config and restart the job
> every time. Is there any way I do not have to restart the job and it start
> consuming a new set of events.
>
>
> YAML config :
>
> !com.bounce.config.EventTopologyConfig
> eventsType:
>   - !com.bounce.config.EventConfig
>     event_name: "search_list_keyless"
>     schema_subject: 
> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>     topic: "search_list_keyless"
>
>   - !com.bounce.config.EventConfig
>     event_name: "bike_search_details"
>     schema_subject: 
> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>     topic: "bike_search_details"
>
>   - !com.bounce.config.EventConfig
>     event_name: "keyless_bike_lock"
>     schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>     topic: "analytics-keyless"
>
>   - !com.bounce.config.EventConfig
>       event_name: "keyless_bike_unlock"
>       schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>       topic: "analytics-keyless"
>
>
> checkPointInterval: 1200000
>
> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>
>
>
>
>
> *Sink code :*
>
>   YamlReader reader = new YamlReader(topologyConfig);
>     EventTopologyConfig eventTopologyConfig = 
> reader.read(EventTopologyConfig.class);
>
>     long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
>         topics = eventTopologyConfig.getTopics();
>
>                 List<EventConfig> eventTypesList = 
> eventTopologyConfig.getEventsType();
>
>         CachedSchemaRegistryClient registryClient = new 
> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>
>
>         FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
>         new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>         properties);
>
>         DataStream<GenericRecord> dataStream = 
> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>
>         try {
>         for (EventConfig eventConfig : eventTypesList) {
>
>         LOG.info("creating a stream for ", eventConfig.getEvent_name());
>
> final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
>         (path, 
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>  registryClient)))
>         .withBucketAssigner(new EventTimeBucketAssigner())
>         .build();
>
>         DataStream<GenericRecord> outStream = 
> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>         if (genericRecord != null && 
> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) 
> {
>         return true;
>         }
>         return false;
>         });
>         
> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>
>         }
>         } catch (Exception e) {
>         e.printStackTrace();
>         }
>
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to