Hi Jessy,

Can I add a new sink into the execution graph at runtime, for example : a
> new Kafka producer , without restarting the current application  or using
> option1 ?
>

No, there is no way to add a sink without restart currently. Could you
elaborate why a restart is not an option for you?

You can use Option 2, which means that you implement 1 source and 1 sink
which will dynamically read from or write to different topics possibly by
wrapping the existing source and sink. This is a rather complex task that I
would not recommend to a new Flink user.

If you have a known set of possible sink topics, another option would be to
add all sinks from the go and only route messages dynamically with
side-outputs. However, I'm not aware that such a pattern exists for
sources. Although with the new source interface, it should be possible to
do that.

On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping <tech.user.str...@gmail.com>
wrote:

> Hi Team,
>
> Can you provide your thoughts on this, it will be helpful ..
>
> Thanks
> Jessy
>
> On Tue, 16 Mar 2021 at 21:29, Jessy Ping <tech.user.str...@gmail.com>
> wrote:
>
>> Hi Timo/Team,
>> Thanks for the reply.
>>
>> Just take the example from the following pseduo code,
>> Suppose , this is the current application logic.
>>
>> firstInputStream = addSource(...)* //Kafka consumer C1*
>> secondInputStream =  addSource(...) *//Kafka consumer C2*
>>
>> outputStream = firstInputStream,keyBy(a -> a.key)
>> .connect(secondInputStream.keyBy(b->b.key))
>> .coProcessFunction(....)
>> * // logic determines : whether a new sink should be added to the
>> application or not ?. If not: then the event will be produced to the
>> existing sink(s). If a new sink is required: produce the events to the
>> existing sinks + the new one*
>> sink1 = addSink(outPutStream). //Kafka producer P1
>> .
>> .
>> .
>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>
>> *Questions*
>> --> Can I add a new sink into the execution graph at runtime, for example
>> : a new Kafka producer , without restarting the current application  or
>> using option1 ?
>>
>> -->  (Option 2 )What do you mean by adding a custom sink at
>> coProcessFunction , how will it change the execution graph ?
>>
>> Thanks
>> Jessy
>>
>>
>>
>> On Tue, 16 Mar 2021 at 17:45, Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Jessy,
>>>
>>> to be precise, the JobGraph is not used at runtime. It is translated
>>> into an ExecutionGraph.
>>>
>>> But nevertheless such patterns are possible but require a bit of manual
>>> implementation.
>>>
>>> Option 1) You stop the job with a savepoint and restart the application
>>> with slightly different parameters. If the pipeline has not changed
>>> much, the old state can be remapped to the slightly modified job graph.
>>> This is the easiest solution but with the downside of maybe a couple of
>>> seconds downtime.
>>>
>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>> connect() DataStream API [1]). Either you implement a custom sink in the
>>> main stream of the CoProcessFunction. Or you enrich every record in the
>>> main stream with sink parameters that are read by you custom sink
>>> implementation.
>>>
>>> I hope this helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>
>>> On 16.03.21 12:37, Jessy Ping wrote:
>>> > Hi Team,
>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
>>> > add a new sink to the flink application at runtime that depends upon
>>> > the  specific parameters in the incoming events.Can i edit the
>>> jobgraph
>>> > of a running flink application ?
>>> >
>>> > Thanks
>>> > Jessy
>>>
>>>

Reply via email to