Hi Vino,

yes, in the source function it is possible. But you said, "before it hits
the Source". So, IMO I think it is outside of the flink workflow.
Best,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Nov 26, 2019 at 10:09 AM vino yang <yanghua1...@gmail.com> wrote:

> Hi Felipe,
>
> Why do you think it's not possible.
>
> My thought is we can do the data pre-procession in the source function. If
> so, source function would contain consume upstream events then do
> pre-processing then emits to the downstream.
>
> Best,
> Vino
>
>
> Felipe Gutierrez <felipe.o.gutier...@gmail.com> 于2019年11月26日周二 下午4:56写道:
>
>> I am afraid that this is not possible in FLink, since the entry point of
>> all transformation is the source function. Everything that we can
>> pre-process is in the source function or on the downstream operators.
>> If you want to pre-process something before the data hits the source you
>> will have to rely on the broker/storage/queue that the source consumes your
>> data, not in FLink.
>>
>> Best,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Tue, Nov 26, 2019 at 2:57 AM vino yang <yanghua1...@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> IMO, the semantics of the source is not changeless. It can contain
>>> integrate with third-party systems and consume events. However, it can also
>>> contain more business logic about your data pre-process after consuming
>>> events.
>>>
>>> Maybe it needs some customization. WDYT?
>>>
>>> Best,
>>> Vino
>>>
>>> Vijay Balakrishnan <bvija...@gmail.com> 于2019年11月26日周二 上午6:45写道:
>>>
>>>> Hi,
>>>> Need to pre-process data(transform incoming data to a different format)
>>>> before it hits the Source I have defined. How can I do that ?
>>>>
>>>> I tried to use a .map on the DataStream but that is too late as the
>>>> data has already hit the Source I defined.
>>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer =
>>>> getMonitoringFlinkKinesisConsumer(local, localKinesis, kinesisTopicRead,
>>>> region, getRecsMax, getRecsIntervalMs, connectionTimeout, maxConnections,
>>>> socketTimeout);
>>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>>>> env.addSource(kinesisConsumer);
>>>>
>>>> DataStream<Map<String, Object>> kinesisStream1 = kinesisStream.map(new
>>>> TransformFunction(...));//too late here
>>>>
>>>> TIA,
>>>>
>>>

Reply via email to