Flink 1.17 upgrade issue when using azure storage account for checkpoints/savepoints

2023-03-25 Thread Jessy Ping
Hi Team.

The application failed to start after upgrading from Flink 1.16.1 to 1.17.0
in both kubernetes and docker.

I didn't make any changes in flink configurations related to savepoints and
checkpoints.


Root cause:  Caused by: java.util.concurrent.CompletionException:
java.lang.RuntimeException:  java.lang.ClassNotFoundException: Class
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found

Please find the attachment for the complete error trace.

Are there any breaking changes in Flink 1.17.0 in terms of azure file
system?.

Thank you
Jessy


error.json
Description: application/json


regarding flink metrics

2022-02-01 Thread Jessy Ping
Hi Team,

We are using datadog and its http reporter( packaged in flink image) for
sending metrics from flink application. We do have a requirement for
setting tags with values calculated at runtime for the custom metrics
emitted from Flink. Currently, it is impossible to assign tags at runtime.
Is there a work arround for the same ?

Thanks
Jessy


Re: Regarding Queryable state in Flink

2022-01-25 Thread Jessy Ping
Hi Matthias,

I want to query the current state of the application at real-time. Hence,
state processor API won't fit here. I have the following questions,

* Is the queryable state stable enough to use in production systems ?.

Are there any improvements or development activities planned or going on
with queryable state ? Is  Flink community still supporting queryable state
and suggesting its usage? .



Thanks Jessy


On Tue, Jan 25, 2022, 10:19 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Jessy,
>
>
>
> Have you considered using the state processor api [1] for offline analysis
> of checkpoints and savepoints?
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Jessy Ping 
> *Sent:* Montag, 24. Januar 2022 16:47
> *To:* user 
> *Subject:* Regarding Queryable state in Flink
>
>
>
> Hi Team,
>
>
>
> We are currently running our streaming application based Flink(Datastream
> API ) on a non-prod cluster.And planning to move it to production cluster
> soon.. We are keeping cerating keyed state backed by rocksdb in the flink
> application. We need a mechanism to query these keyed state values for
> debugging and troubleshooting. Is it a good idea to use Queryable state for
> a single link-job running in application-mode on kubernetes for an average
> load of 10k events/second ?.
>
> Or is it a better idea to keep these state values in an external k,v store
> ?.
>
>
>
> So in short --> Is the queryable state stable enough to use in production
> systems ?
>
>
>
>
>
> Thanks
>
> Jessy
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Regarding Queryable state in Flink

2022-01-24 Thread Jessy Ping
Hi Team,

We are currently running our streaming application based Flink(Datastream
API ) on a non-prod cluster.And planning to move it to production cluster
soon.. We are keeping cerating keyed state backed by rocksdb in the flink
application. We need a mechanism to query these keyed state values for
debugging and troubleshooting. Is it a good idea to use Queryable state for
a single link-job running in application-mode on kubernetes for an average
load of 10k events/second ?.
Or is it a better idea to keep these state values in an external k,v store
?.

So in short --> Is the queryable state stable enough to use in production
systems ?


Thanks
Jessy


Re: Flink (DataStream) in Kubernetes

2022-01-18 Thread Jessy Ping
Hi Team,
Any insights for below mail will be helpful.

Thanks
Jessy

On Fri, Jan 14, 2022, 11:09 PM Jessy Ping 
wrote:

> Hi Team,
>
> We are planning to run the below pipeline as a standalone Flink
> application cluster on kubernetes. It will be better if the community can
> share their insights regarding the below questions.
>
> [image: image.png]
> We can describe the pipeline as follows,
>
>1. Combine the realtime streams from S1, enrichment data from S2 and
>S3 using Union Operator. Partition the stream based on value1 for keeping
>the enrichment data locally available.
>2. Broadcast the rules to process the data from S4.
>3. Connect the above two streams(1&2) and process the real time events
>from S1 using the enrichment data from S2 and S3 stored in rocksDB state as
>per the rules stored in broadcast state inside the keyed broadcast process
>function.
>4. Produce the transformed results to a Kafka Sink.
>
> Note: Kafka Source S1 has 32 partitions. Suppose we have 1 million
> distinct keys and expect 10k events/s from S1.
>
> Approach 1: Application cluster with 16 task managers. Each task manager
> has 2 slots and 2 CPUs.
> Approach 2: Application cluster with 2 task managers. Each task manager
> has 16 slots and 16 CPUs.
>
> *Questions*
>
>- Which approach is suitable for a standalone deployment in
>Kubernetes? Do we have some best practises for running Flink applications
>on K8s ?
>- We are planning to connect the source S1, S2 and S3 using Union
>Operator. And these sources have different parallelism settings, equal to
>the available kafka partitions. And the downstream process function has the
>same parallelism as the real-time kafka source S1. Is it a good idea to
>apply union on streams with different parallelisms ?.
>- The size of the broadcast state is around 20mb, so the checkpoint
>size of the broadcast state will be 740mb ( maximum parallelism * size, 32*
>20 ). All events required the entire rules for processing the data, hence
>keeping this in rocksdb is not possible. Is it a good approach to keep a
>large state in broadcast-state?.
>- Is it a good practice to use a singleton pattern in Flink to create
>a local cache of the rules inside the open method of process function ?. If
>data losses due to restart i can repopulate the data using an external
>call. Can I keep these kinds of local caches(created inside open method)
>safely for the entire lifetime of a particular pod/task manager ?
>- Is there any relation between incremental checkpoints and maximum
>number of completed checkpoints (state.checkpoints.num-retained) ?
>- Will the entire state be checkpointed every time irrespective of the
>delta between the checkpoints if I have enabled incremental checkpoints for
>my rocksdb state backend and set the maximum number of completed
>checkpoints to 1 ?
>
> Thanks
> Jessy
>
>


Flink (DataStream) in Kubernetes

2022-01-14 Thread Jessy Ping
Hi Team,

We are planning to run the below pipeline as a standalone Flink application
cluster on kubernetes. It will be better if the community can share their
insights regarding the below questions.

[image: image.png]
We can describe the pipeline as follows,

   1. Combine the realtime streams from S1, enrichment data from S2 and S3
   using Union Operator. Partition the stream based on value1 for keeping the
   enrichment data locally available.
   2. Broadcast the rules to process the data from S4.
   3. Connect the above two streams(1&2) and process the real time events
   from S1 using the enrichment data from S2 and S3 stored in rocksDB state as
   per the rules stored in broadcast state inside the keyed broadcast process
   function.
   4. Produce the transformed results to a Kafka Sink.

Note: Kafka Source S1 has 32 partitions. Suppose we have 1 million distinct
keys and expect 10k events/s from S1.

Approach 1: Application cluster with 16 task managers. Each task manager
has 2 slots and 2 CPUs.
Approach 2: Application cluster with 2 task managers. Each task manager has
16 slots and 16 CPUs.

*Questions*

   - Which approach is suitable for a standalone deployment in Kubernetes?
   Do we have some best practises for running Flink applications on K8s ?
   - We are planning to connect the source S1, S2 and S3 using Union
   Operator. And these sources have different parallelism settings, equal to
   the available kafka partitions. And the downstream process function has the
   same parallelism as the real-time kafka source S1. Is it a good idea to
   apply union on streams with different parallelisms ?.
   - The size of the broadcast state is around 20mb, so the checkpoint size
   of the broadcast state will be 740mb ( maximum parallelism * size, 32* 20
   ). All events required the entire rules for processing the data, hence
   keeping this in rocksdb is not possible. Is it a good approach to keep a
   large state in broadcast-state?.
   - Is it a good practice to use a singleton pattern in Flink to create a
   local cache of the rules inside the open method of process function ?. If
   data losses due to restart i can repopulate the data using an external
   call. Can I keep these kinds of local caches(created inside open method)
   safely for the entire lifetime of a particular pod/task manager ?
   - Is there any relation between incremental checkpoints and maximum
   number of completed checkpoints (state.checkpoints.num-retained) ?
   - Will the entire state be checkpointed every time irrespective of the
   delta between the checkpoints if I have enabled incremental checkpoints for
   my rocksdb state backend and set the maximum number of completed
   checkpoints to 1 ?

Thanks
Jessy


Regarding the size of Flink cluster

2021-12-10 Thread Jessy Ping
Hi All,


I have the following questions regarding the sizing of the Flink cluster
doing stateful computation using Datastream API. It will be better if the
community can answer the below questions or doubts.



Suppose we have a pipeline as follows,


*Kafka real time events source1 & Kafka rules source 2 ->
KeyedBroadcastProcessFunction -> Kafka Sink*


As you can see, we will be processing the real-time events from the Kafka
source using the rules broadcasted from the rule source with the help of
keyed broadcast function.


*Questions*



   - I have a machine with 16 CPUs and 32 GB Ram. Which configuration is
   efficient for achieving the target parallelism of 16?


   1. A single task manager with 16 task slots
   2. 16 Task Managers with 1 task slot and 1 CPU each.




   - If I have a broadcast state in my pipeline and I have a single task
   manager with 16 task slots for achieving the target parallelism of 16. Does
   Flink keep 16 copies of broadcast state in the single task manager or there
   will be a single copy in the HEAP for the entire task slots?



   - If a parallelism of n means, I can process only n events/seconds(if
   the latency of the pipeline is 1s.). How many requests a single task
   slot (containing a single task) can execute at a time ?



   - Can Flink process multiple events from the same key at the same time?



   - I have found the following blog regarding the Flink cluster size,
   
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines.
   Do we have some other blogs, testimonials, or books regarding the sample
   production setup/configuration of a Flink cluster for achieving
   different ranges of throughput ?



   - Are there any blogs regarding the results of Flink's load testing
   results ?


Thanks

Jessy


Re: Stateful Function Ingress issues

2021-06-10 Thread Jessy Ping
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and
i am getting the following error.

*java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress
requires a UTF-8 key set for each record.*

While sending the data to the Event hub using my data producer, I am not
sending it with any KEY. And the same data can be consumed without any
issues with a normal flink application .

I can see the error is raising from
RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()

Why is this check important and how to resolve this for eventhubs.?

private byte[] requireNonNullKey(byte[] key) {
  if (key == null) {
IngressType tpe =
ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE;
throw new IllegalStateException(
"The "
+ tpe.namespace()
+ "/"
+ tpe.type()
+ " ingress requires a UTF-8 key set for each record.");
  }
  return key;
}

Thanks
Jessy
On Thu, 10 Jun 2021 at 20:13, Jessy Ping  wrote:

> Hi all,
>
> I am trying to consume data from azure eventhub using the kafka ingress
> and i am getting the following error.
>
> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress
> requires a UTF-8 key set for each record.*
>
> While sending the data to the Event hub using my data producer, I am not
> sending it with any KEY. And the same data can be consumed without any
> issues with a normal flink application .
>
> I can see the error is raising from
> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()
>
>
>


Stateful Function Ingress issues

2021-06-10 Thread Jessy Ping
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and
i am getting the following error.

*java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress
requires a UTF-8 key set for each record.*

While sending the data to the Event hub using my data producer, I am not
sending it with any KEY. And the same data can be consumed without any
issues with a normal flink application .

I can see the error is raising from
RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()


Configure Kafka ingress through property files in Stateful function 3.0.0

2021-05-27 Thread Jessy Ping
Hi all,

I am getting the following error when I tried to provide kafka properties
for ingress using  'ingress.spec.properties: consumer.properties'.

Error: .flink.statefun.flink.common.json.WrongTy peException Wrong type for
key /ingress/spec/properties not a key-value list. and
Caused by: org.apache.flink.statefun.flink.common.json.MissingKeyException:
missing key /ingress/spec/address
at
org.apache.flink.statefun.flink.common.json.Selectors.dereference(Selectors.java:150)

Any insights in this regard will be helpful .

Thanks
Jessy


Re: Regarding Stateful Functions

2021-05-13 Thread Jessy Ping
Hi Austin,


Thanks for your insights.


We are currently following a microservice architecture for accomplishing
our data processing requirements. We are planning to use Flink as our
unified platform for all data processing tasks. Although most of our use
cases are a suitable fit for Flink, there is one use case that needs some
extra deep dive into the capabilities of Flink.


As I mentioned in my previous email, the processing flow of the use case in
discussion is as follows,


*ingress(>=10k/s)--> First transformation based on certain static rules -->
second transformation based on certain dynamic rules --> Third and final
transformation based on certain dynamic and static rules --> egress*


In our current design, we are using a microservice embedded Hazelcast
cluster. It's a complex system with several stability issues. We are
looking for an alternative solution based on open sources, and it seems
like the stateful function powered by Flink is an ideal candidate. The
following features of 'Stateful Functions' attracted us,

1. Consistent State.

2. No Database Required

3. Exactly once semantics.

4. Logical Addressing

5. Multi-language support.


Any additional insights in the already mentioned questions are helpful.

Thanks

Jessy

On Thu, 13 May 2021 at 04:25, Austin Cawley-Edwards 
wrote:

> Hey Jessy,
>
> I'm not a Statefun expert but, hopefully, I can point you in the right
> direction for some of your questions. I'll also cc Gordan, who helps to
> maintain Statefun.
>
> *1. Is the stateful function a good candidate for a system(as above) that
>> should process incoming requests at the rate of 10K/s depending on various
>> dynamic rules and static rules*
>> *? *
>>
>
> The scale is definitely manageable in a Statefun cluster, and could
> possibly be a good fit for dynamic and static rules. Hopefully Gordon can
> comment more there. For the general Flink solution to this problem, I
> always turn to this great series of blog posts around fraud detection with
> dynamic rules[1].
>
> 2.* Is Flink capable of accommodating the above-mentioned dynamic rules
>> in its states (about 1500 rules per Keyed Event ) for the faster
>> transformation of incoming streams? *
>>
>
> This may be manageable as well, depending on how you are applying these
> rules and what they look like (size, etc.). Can you give any more
> information there?
>
>
> *3.** I**f we are not interested in using AWS lambda or Azure functions,
>> what are the other options?. What about using co-located functions and
>> embedded functions? * *Is there any benefit in using one over the other
>> for my data processing flow?*
>>
>
> Yes, you can embed JVM functions via Embedded Modules[2], which in your
> case might benefit from the Flink DataStream integration[3]. You can also
> host remote functions anywhere, i.e. Kubernetes, behind an NGINX server,
> etc. The Module Configuration section[4] will likely shed more light on
> what is available. I think the main tradeoffs here are availability,
> scalability, and network latency for external functions.
>
> 4*.If we are going with embedded functions/co-located functions, is it
>> possible to autoscale the application using the recently released reactive
>> mode in Flink 1.13?*
>>
>
> Statefun 3.0 uses Flink 1.12 but is expected to upgrade to Flink 1.13 in
> the next release cycle. There are a few other changes that are necessary to
> be compatible with Reactive Mode (i.e make the Statefun Cluster a regular
> Flink Application tracked in FLINK-16930 [5]), but it's coming!
>
>
> On a higher note, what made you interested in Statefun for this use case?
> The community is currently trying to expand our understanding of potential
> users, so it would be great to hear a bit more!
>
> Best,
> Austin
>
> [1]: https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
> [2]:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/embedded/#embedded-module-configuration
> [3]:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/
> [4]:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/#module-configuration
> [5]: https://issues.apache.org/jira/browse/FLINK-16930
>
> On Wed, May 12, 2021 at 11:53 AM Jessy Ping 
> wrote:
>
>> Hi all,
>>
>>
>> I have gone through the stateful function's documentation and required
>> some expert advice or clarification regarding the following points.
>>
>>
>> *Note: My data processing flow is as follows,*
>>
>>
>> *ingress(10k/s)--> First transformation based on certain static rules -->
>> second tr

Regarding Stateful Functions

2021-05-12 Thread Jessy Ping
Hi all,


I have gone through the stateful function's documentation and required some
expert advice or clarification regarding the following points.


*Note: My data processing flow is as follows,*


*ingress(10k/s)--> First transformation based on certain static rules -->
second transformation based on certain dynamic rules --> Third and final
transformation based on certain dynamic and static rules --> egress*


*Questions*

*1. Is the stateful function a good candidate for a system(as above) that
should process incoming requests at the rate of 10K/s depending on various
dynamic rules and static rules**? *


2.* Is Flink capable of accommodating the above-mentioned dynamic rules in
its states (about 1500 rules per Keyed Event ) for the faster
transformation of incoming streams? *


*3.** I**f we are not interested in using AWS lambda or Azure functions,
what are the other options?. What about using co-located functions and
embedded functions? * *Is there any benefit in using one over the other for
my data processing flow?*


4*.If we are going with embedded functions/co-located functions, is it
possible to autoscale the application using the recently released reactive
mode in Flink 1.13?*


*Thanks*

*Jessy*


Flink: Clarification required

2021-05-10 Thread Jessy Ping
Hi all,


Currently, we are exploring the various features of Flink and need some
clarification on the below-mentioned questions.


   - I have a stateless Flink application where the source and sink are two
   different Kafka topics. Is there any benefit in adding checkpointing for
   this application?. will it help in some way for the rewind and replays
   while restarting from the failure?

   - I have a stateful use case where events are processed based on a set
   of dynamic rules provided by an external system, say a Kafka source. Also,
   the actual events are distinguishable based on a key.A broadcast function
   is used for broadcasting the dynamic rules and storing the same in Flink
   state.

   So my question is, processing the incoming streams based on these rules
   stored in Flink state per key is efficient or not ( i am using rocksdb as
   state-backend ) ?

   What about using an external cache for this?

   Is stateful function a good contender here?

   -  Is there any benefit in using Apache camel along with Flink ?



Thanks
Jessy


Re: Editing job graph at runtime

2021-03-23 Thread Jessy Ping
Hi Arvid,

Thanks for the reply.

I am currently exploring the flink features and we have certain use cases
where new producers will be added the system dynamically and we don't want
to restart the application frequently.

It will be helpful if you explain the option 2 in detail ?

Thanks & Regards
Jessy

On Mon, 22 Mar 2021 at 19:27, Arvid Heise  wrote:

> Hi Jessy,
>
> Can I add a new sink into the execution graph at runtime, for example : a
>> new Kafka producer , without restarting the current application  or using
>> option1 ?
>>
>
> No, there is no way to add a sink without restart currently. Could you
> elaborate why a restart is not an option for you?
>
> You can use Option 2, which means that you implement 1 source and 1 sink
> which will dynamically read from or write to different topics possibly by
> wrapping the existing source and sink. This is a rather complex task that I
> would not recommend to a new Flink user.
>
> If you have a known set of possible sink topics, another option would be
> to add all sinks from the go and only route messages dynamically with
> side-outputs. However, I'm not aware that such a pattern exists for
> sources. Although with the new source interface, it should be possible to
> do that.
>
> On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping 
> wrote:
>
>> Hi Team,
>>
>> Can you provide your thoughts on this, it will be helpful ..
>>
>> Thanks
>> Jessy
>>
>> On Tue, 16 Mar 2021 at 21:29, Jessy Ping 
>> wrote:
>>
>>> Hi Timo/Team,
>>> Thanks for the reply.
>>>
>>> Just take the example from the following pseduo code,
>>> Suppose , this is the current application logic.
>>>
>>> firstInputStream = addSource(...)* //Kafka consumer C1*
>>> secondInputStream =  addSource(...) *//Kafka consumer C2*
>>>
>>> outputStream = firstInputStream,keyBy(a -> a.key)
>>> .connect(secondInputStream.keyBy(b->b.key))
>>> .coProcessFunction()
>>> * // logic determines : whether a new sink should be added to the
>>> application or not ?. If not: then the event will be produced to the
>>> existing sink(s). If a new sink is required: produce the events to the
>>> existing sinks + the new one*
>>> sink1 = addSink(outPutStream). //Kafka producer P1
>>> .
>>> .
>>> .
>>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>>
>>> *Questions*
>>> --> Can I add a new sink into the execution graph at runtime, for
>>> example : a new Kafka producer , without restarting the current
>>> application  or using option1 ?
>>>
>>> -->  (Option 2 )What do you mean by adding a custom sink at
>>> coProcessFunction , how will it change the execution graph ?
>>>
>>> Thanks
>>> Jessy
>>>
>>>
>>>
>>> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>>>
>>>> Hi Jessy,
>>>>
>>>> to be precise, the JobGraph is not used at runtime. It is translated
>>>> into an ExecutionGraph.
>>>>
>>>> But nevertheless such patterns are possible but require a bit of manual
>>>> implementation.
>>>>
>>>> Option 1) You stop the job with a savepoint and restart the application
>>>> with slightly different parameters. If the pipeline has not changed
>>>> much, the old state can be remapped to the slightly modified job graph.
>>>> This is the easiest solution but with the downside of maybe a couple of
>>>> seconds downtime.
>>>>
>>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>>> connect() DataStream API [1]). Either you implement a custom sink in
>>>> the
>>>> main stream of the CoProcessFunction. Or you enrich every record in the
>>>> main stream with sink parameters that are read by you custom sink
>>>> implementation.
>>>>
>>>> I hope this helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> [1]
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>>
>>>> On 16.03.21 12:37, Jessy Ping wrote:
>>>> > Hi Team,
>>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want
>>>> to
>>>> > add a new sink to the flink application at runtime that depends upon
>>>> > the  specific parameters in the incoming events.Can i edit the
>>>> jobgraph
>>>> > of a running flink application ?
>>>> >
>>>> > Thanks
>>>> > Jessy
>>>>
>>>>


Re: Editing job graph at runtime

2021-03-17 Thread Jessy Ping
Hi Team,

Can you provide your thoughts on this, it will be helpful ..

Thanks
Jessy

On Tue, 16 Mar 2021 at 21:29, Jessy Ping  wrote:

> Hi Timo/Team,
> Thanks for the reply.
>
> Just take the example from the following pseduo code,
> Suppose , this is the current application logic.
>
> firstInputStream = addSource(...)* //Kafka consumer C1*
> secondInputStream =  addSource(...) *//Kafka consumer C2*
>
> outputStream = firstInputStream,keyBy(a -> a.key)
> .connect(secondInputStream.keyBy(b->b.key))
> .coProcessFunction()
> * // logic determines : whether a new sink should be added to the
> application or not ?. If not: then the event will be produced to the
> existing sink(s). If a new sink is required: produce the events to the
> existing sinks + the new one*
> sink1 = addSink(outPutStream). //Kafka producer P1
> .
> .
> .
> sinkN =  addSink(outPutStream). //Kafka producer PN
>
> *Questions*
> --> Can I add a new sink into the execution graph at runtime, for example
> : a new Kafka producer , without restarting the current application  or
> using option1 ?
>
> -->  (Option 2 )What do you mean by adding a custom sink at
> coProcessFunction , how will it change the execution graph ?
>
> Thanks
> Jessy
>
>
>
> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>
>> Hi Jessy,
>>
>> to be precise, the JobGraph is not used at runtime. It is translated
>> into an ExecutionGraph.
>>
>> But nevertheless such patterns are possible but require a bit of manual
>> implementation.
>>
>> Option 1) You stop the job with a savepoint and restart the application
>> with slightly different parameters. If the pipeline has not changed
>> much, the old state can be remapped to the slightly modified job graph.
>> This is the easiest solution but with the downside of maybe a couple of
>> seconds downtime.
>>
>> Option 2) You introduce a dedicated control stream (i.e. by using the
>> connect() DataStream API [1]). Either you implement a custom sink in the
>> main stream of the CoProcessFunction. Or you enrich every record in the
>> main stream with sink parameters that are read by you custom sink
>> implementation.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>
>> On 16.03.21 12:37, Jessy Ping wrote:
>> > Hi Team,
>> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
>> > add a new sink to the flink application at runtime that depends upon
>> > the  specific parameters in the incoming events.Can i edit the jobgraph
>> > of a running flink application ?
>> >
>> > Thanks
>> > Jessy
>>
>>


Re: Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Timo/Team,
Thanks for the reply.

Just take the example from the following pseduo code,
Suppose , this is the current application logic.

firstInputStream = addSource(...)* //Kafka consumer C1*
secondInputStream =  addSource(...) *//Kafka consumer C2*

outputStream = firstInputStream,keyBy(a -> a.key)
.connect(secondInputStream.keyBy(b->b.key))
.coProcessFunction()
* // logic determines : whether a new sink should be added to the
application or not ?. If not: then the event will be produced to the
existing sink(s). If a new sink is required: produce the events to the
existing sinks + the new one*
sink1 = addSink(outPutStream). //Kafka producer P1
.
.
.
sinkN =  addSink(outPutStream). //Kafka producer PN

*Questions*
--> Can I add a new sink into the execution graph at runtime, for example :
a new Kafka producer , without restarting the current application  or using
option1 ?

-->  (Option 2 )What do you mean by adding a custom sink at
coProcessFunction , how will it change the execution graph ?

Thanks
Jessy



On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:

> Hi Jessy,
>
> to be precise, the JobGraph is not used at runtime. It is translated
> into an ExecutionGraph.
>
> But nevertheless such patterns are possible but require a bit of manual
> implementation.
>
> Option 1) You stop the job with a savepoint and restart the application
> with slightly different parameters. If the pipeline has not changed
> much, the old state can be remapped to the slightly modified job graph.
> This is the easiest solution but with the downside of maybe a couple of
> seconds downtime.
>
> Option 2) You introduce a dedicated control stream (i.e. by using the
> connect() DataStream API [1]). Either you implement a custom sink in the
> main stream of the CoProcessFunction. Or you enrich every record in the
> main stream with sink parameters that are read by you custom sink
> implementation.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>
> On 16.03.21 12:37, Jessy Ping wrote:
> > Hi Team,
> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
> > add a new sink to the flink application at runtime that depends upon
> > the  specific parameters in the incoming events.Can i edit the jobgraph
> > of a running flink application ?
> >
> > Thanks
> > Jessy
>
>


Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Team,
Is it possible to edit the job graph at runtime ? . Suppose, I want to add
a new sink to the flink application at runtime that depends upon the
specific parameters in the incoming events.Can i edit the jobgraph of a
running flink application ?

Thanks
Jessy