Hi Vino, yes, in the source function it is possible. But you said, "before it hits the Source". So, IMO I think it is outside of the flink workflow. Best, Felipe *--* *-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Tue, Nov 26, 2019 at 10:09 AM vino yang <yanghua1...@gmail.com> wrote: > Hi Felipe, > > Why do you think it's not possible. > > My thought is we can do the data pre-procession in the source function. If > so, source function would contain consume upstream events then do > pre-processing then emits to the downstream. > > Best, > Vino > > > Felipe Gutierrez <felipe.o.gutier...@gmail.com> 于2019年11月26日周二 下午4:56写道: > >> I am afraid that this is not possible in FLink, since the entry point of >> all transformation is the source function. Everything that we can >> pre-process is in the source function or on the downstream operators. >> If you want to pre-process something before the data hits the source you >> will have to rely on the broker/storage/queue that the source consumes your >> data, not in FLink. >> >> Best, >> Felipe >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Tue, Nov 26, 2019 at 2:57 AM vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi Vijay, >>> >>> IMO, the semantics of the source is not changeless. It can contain >>> integrate with third-party systems and consume events. However, it can also >>> contain more business logic about your data pre-process after consuming >>> events. >>> >>> Maybe it needs some customization. WDYT? >>> >>> Best, >>> Vino >>> >>> Vijay Balakrishnan <bvija...@gmail.com> 于2019年11月26日周二 上午6:45写道: >>> >>>> Hi, >>>> Need to pre-process data(transform incoming data to a different format) >>>> before it hits the Source I have defined. How can I do that ? >>>> >>>> I tried to use a .map on the DataStream but that is too late as the >>>> data has already hit the Source I defined. >>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = >>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead, >>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections, >>>> socketTimeout); >>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource = >>>> env.addSource(kinesisConsumer); >>>> >>>> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new >>>> TransformFunction(...));//too late here >>>> >>>> TIA, >>>> >>>