Re: SNI issue

2022-12-08 Thread Yanfei Lei
Hi, I didn't face this issue, and I'm guessing it might have something to
do with the configuration of SSL[1], have you configured the
"security.ssl.rest.enabled" option?

[1]
https://cnightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/#configuring-ssl


Jean-Damien Hatzenbuhler via user  于2022年12月8日周四
01:12写道:

> Hello,
> When using the job manager API with an https proxy that uses SNI in front
> to route the traffic, I get an issue because the flink cli doesn't use the
> SNI when calling in https the API.
> Did other user face this issue ?
> Regards
>


-- 
Best,
Yanfei


Re: Exceeded Checkpoint tolerable failure

2022-12-08 Thread Yanfei Lei
Hi Madan,

Maybe you can check the value of  "
*execution.checkpointing.tolerable-failed-checkpoints"*[1] in your
application configuration, and try to increase this value?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints

Madan D via user  于2022年12月8日周四 11:40写道:

> Hello All,
> I am seeing below issue after I upgraded from 1.9.0 to 1.14.2 while
> publishing messages to pub sub which is causing frequent job restart and
> slow processing.
>
> Can you please help me.
>
> `Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded
> checkpoint tolerable failure threshold.
>at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:98)
>at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:67)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1940)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1912)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:98)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1996)
>at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at java.util.concurrent.ThreadPoolExecut
>
> Regards,
> Madan
>
>
>

-- 
Best,
Yanfei


Re: Exceeded Checkpoint tolerable failure

2022-12-08 Thread Hangxiang Yu
Hi, Madan.
I think there is a root cause of the exception, could you share it ?
BTW, If you don't set a value for
execution.checkpointing.tolerable-failed-checkpoints, I'd recommend you
to set it which could avoid job restart due to some recoverable temporary
problems.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints

On Thu, Dec 8, 2022 at 11:41 AM Madan D via user 
wrote:

> Hello All,
> I am seeing below issue after I upgraded from 1.9.0 to 1.14.2 while
> publishing messages to pub sub which is causing frequent job restart and
> slow processing.
>
> Can you please help me.
>
> `Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded
> checkpoint tolerable failure threshold.
>at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:98)
>at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:67)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1940)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1912)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:98)
>at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1996)
>at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at java.util.concurrent.ThreadPoolExecut
>
> Regards,
> Madan
>
>
>

-- 
Best,
Hangxiang.


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Hangxiang Yu
Hi, Lars.
Could you check whether you have configured the lifecycle of google cloud
storage[1] which is not recommended in the flink checkpoint usage?

[1] https://cloud.google.com/storage/docs/lifecycle

On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:

> Hello,
> We had an incident today with a job that could not restore after crash
> (for unknown reason). Specifically, it fails due to a missing checkpoint
> file. We've experienced this a total of three times with Flink 1.15.2, but
> never with 1.14.x. Last time was during a node upgrade, but that was not
> the case this time.
>
> I've not been able to reproduce this issue. I've checked that I can kill
> the taskmanager and jobmanager (using kubectl delete pod), and the job
> restores as expected.
>
> The job is running with kubernetes high availability, rocksdb and
> incremental checkpointing.
>
> Any tips are highly appreciated.
>
> Thanks,
> Lars
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
> 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.FileNotFoundException: Item not found:
> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
> Note, it is possible that the live version is still available but the
> requested generation is deleted.
> at
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>
>

-- 
Best,
Hangxiang.


Re: ZLIB Vulnerability Exposure in Flink statebackend RocksDB

2022-12-08 Thread Vidya Sagar Mula
Thank you Yanfei for taking this issue as a bug and planning a fix in the
upcoming version.

I have another vulnerability bug coming on our product. It is related to
the "LZ4" compression library version. Can you please take a look at this
link?
https://nvd.nist.gov/vuln/detail/CVE-2019-17543

I have noticed that, Flink code base is using "*1.8.0*". Vulnerability is present for the versions *before 1.9.2.*

https://github.com/apache/flink/blob/master/pom.xml

Can you please look into this issue also and address it in the coming
releases?

Questions:
---
- Is the code actually using this compression library? Can this
vulnerability issue be ignored?

- Can you please let me know if this is going to be addressed. If yes,
until we move to the new Flink version to get the latest changes, would it
be ok if we upgrade the version of LZ4 in our local cloned code base? I
would like to understand the impact if we make changes in our local Flink
code with regards to testing efforts and any other affected modules?

Can you please clarify this?

Thanks,
Vidya Sagar.


On Wed, Dec 7, 2022 at 7:59 AM Yanfei Lei  wrote:

> Hi Vidya Sagar,
>
> Thanks for bringing this up.
>
> The RocksDB state backend defaults to Snappy[1]. If the compression option
> is not specifically configured, this vulnerability of ZLIB has no effect on
> the Flink application for the time being.
>
> *> is there any plan in the coming days to address this? *
>
> The FRocksDB 6.20.3-ververica-1.0
> 
>   does
> depend on ZLIB 1.2.11, FLINK-30321 is created to address this.
>
> *> If this needs to be fixed, is there any plan from Ververica to address
> this vulnerability?*
>
> Yes, we plan to publish a new version of FRocksDB[3] in Flink 1.17, and 
> FLINK-30321
> would be included in the new release.
>
> *> how to address this vulnerability issue as this is coming as a high
> severity blocking issue to our product.*
>
> As a kind of mitigation, don't configure ZLIB compression for RocksDB
> state backend.
> If ZLIB must be used now and your product can't wait, maybe you can refer
> to this release document[4] to release your own version.
>
> [1] https://github.com/facebook/rocksdb/wiki/Compression
> [2] https://issues.apache.org/jira/browse/FLINK-30321
> [3] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
> [4]
> https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/FROCKSDB-RELEASE.md
>
> --
> Best,
> Yanfei
> Ververica (Alibaba)
>
> Vidya Sagar Mula  于2022年12月7日周三 06:47写道:
>
>> Hi,
>>
>> There is a ZLIB vulnerability reported by the official National
>> Vulnerability Database. This vulnerability causes memory corruption while
>> deflating with ZLIB version less than 1.2.12.
>> Here is the link for details...
>>
>>
>> https://nvd.nist.gov/vuln/detail/cve-2018-25032#vulnCurrentDescriptionTitle
>>
>> *How is it linked to Flink?: *
>> In the Flink statebackend rocksdb, there is ZLIB version 1.2.11 is used
>> as part of the .so file. Hence, there is vulnerability exposure here.
>>
>> *Flink code details/links:*
>> I am seeing the latest Flink code base where the statebackend rocksdb
>> library *(frocksdbjni)* is coming from Ververica. The pom.xml dependency
>> snapshot is here
>>
>>
>> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/pom.xml
>>
>> 
>>
>> com.ververica
>>
>> frocksdbjni
>>
>> 6.20.3-ververica-1.0
>>
>> 
>>
>>
>> When I see the frocksdbjni code base, the makefile is pointing to
>> ZLIB_VER=1.2.11. This ZLIB version is vulnerable as per the NVD.
>>
>> https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/Makefile
>>
>> *Questions:*
>>
>> - This vulnerability is marked as HIGH severity. How is it addressed at
>> the Flink/Flink Stateback RocksDb? If not now, is there any plan in the
>> coming days to address this?
>>
>> - As the Statebackend RocksDb is coming from Ververica, I am not seeing
>> any latest artifacts published from them. As per the Maven Repository, the
>> latest version is 6.20.3-ververica-1.0
>> 
>>  and
>> this is the one used in the Flink code base.
>>
>> https://mvnrepository.com/artifact/com.ververica/frocksdbjni
>>
>> If this needs to be fixed, is there any plan from Ververica to address
>> this vulnerability?
>>
>> - From the Flink user perspective, it is not simple to make the changes
>> to .so file locally. How are the Flink user companies addressing this
>> vulnerability as it needs changes to the .SO file?
>>
>> Overall, my main question to the community is, how to address this
>> vulnerability issue as this is coming as a high severity blocking issue to
>> our product.
>>
>> Please provide the inputs/suggestions at the earliest.
>>
>> Thanks,
>> Vidya Sagar.
>>
>>
>>
>>
>>
>


Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Lars Skjærven
Hello,
We had an incident today with a job that could not restore after crash (for
unknown reason). Specifically, it fails due to a missing checkpoint file.
We've experienced this a total of three times with Flink 1.15.2, but never
with 1.14.x. Last time was during a node upgrade, but that was not the case
this time.

I've not been able to reproduce this issue. I've checked that I can kill
the taskmanager and jobmanager (using kubectl delete pod), and the job
restores as expected.

The job is running with kubernetes high availability, rocksdb and
incremental checkpointing.

Any tips are highly appreciated.

Thanks,
Lars

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.FileNotFoundException: Item not found:
'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
Note, it is possible that the live version is still available but the
requested generation is deleted.
at
com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)


Re: Deterministic co-results

2022-12-08 Thread Salva Alcántara
Hi Alexis,

Thanks a lot for your reply & guidance, which makes a lot of sense to me
overall.

Regards,

Salva

On Thu, Dec 8, 2022 at 5:34 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Salva,
>
> Just to give you further thoughts from another user, I think the "temporal
> join" semantics are very critical in this use case, and what you implement
> for that may not easily generalize to other cases. Because of that, I'm not
> sure if you can really define best practices that apply in general.
> Additionally, you also have to take idleness into account, given that using
> event-time could leave you in a "frozen" state if you're not receiving
> events continuously.
>
> I also doubt you can accurately estimate out-of-orderness in this scenario
> due to the particularities of Flink's network stack [1]. Even if you only
> have 2 sources and immediately connect them together, parallelism and the
> resulting shuffles can change from one execution to the other even if you
> don't change the logic at all, because scheduling is also non-deterministic
> and the "distribution" of events across different parallel instances of
> your sources could vary a lot as well.
>
> I think that others will tell you that you indeed need to find a way to
> buffer events for a while, I got the same advice in the past. Focusing very
> specifically on what you described (streams for data & control events +
> event-time + temporal join), but maybe also assuming you can manage
> watermarks in a way that handles idleness without simply freezing the
> stream, I once tried a custom operator (not function) to force the
> watermarks of 1 stream to wait for the other one -
> see PrioritizedWatermarkStreamIntervalJoinOperator in [2]. This still uses
> the idea of buffering, it just moves that responsibility to the operator
> that's already handling "window data" for the join. Also, it extends an
> internal class, so it's very much unofficial, and it's probably too
> specific to my use case, but maybe it gives you other ideas to consider.
>
> And one last detail to further exemplify complexity here: when I was
> testing my custom operator with event-time simulations in my IDE, I
> initially didn't think about the fact that a watermark with Long.MAX_VALUE
> is sent at the end of the simulation, which was another source of
> non-determinism because sometimes the control stream was processed entirely
> (including the max watermark) before a single event from the data stream
> went through, which meant that all events from the data stream were
> considered late arrivals and silently dropped.
>
> [1] https://flink.apache.org/2019/06/05/flink-network-stack.html
> [2] https://lists.apache.org/thread/n79tsqd5y474n5th1wdhkrh7bvlts8bs
>
> Regards,
> Alexis.
>
> Am Do., 8. Dez. 2022 um 15:20 Uhr schrieb Salva Alcántara <
> salcantara...@gmail.com>:
>
>> Just for adding some extra references:
>>
>> [5]
>> https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
>> [6]
>> https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
>> [7]
>> https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260
>>
>> Salva
>>
>> On 2022/12/07 18:52:42 Salva Alcántara wrote:
>> > It's well-known that Flink does not provide any guarantees on the order
>> in
>> > which a CoProcessFunction (or one of its multiple variations) processes
>> its
>> > two inputs [1]. I wonder then what is the current best
>> practice/recommended
>> > approach for cases where one needs deterministic results in presence of:
>> >
>> > 1. A control stream
>> > 2. An event/data stream
>> >
>> > Let's consider event-time semantics; both streams have timestamps, and
>> we
>> > want to implement "temporal join" semantics where the input events are
>> > controlled based on the latest control signals received before them,
>> i.e.,
>> > the ones "active" when the events happened. For simplicity, let's assume
>> > that all events are received in order, so that the only source of
>> > non-determinism is the one introduced by the CoProcessFunction itself.
>> >
>> > I'm considering the following options:
>> >
>> > 1. Buffer events inside the CoProcessFunction for some time, while
>> saving
>> > all the control signals in state (indexed by time)
>> > 2. Same as before but doing the pre-buffering of the event/data streams
>> > before the CoProcessFunction
>> > 3. Similar as before but considering both streams together by
>> multiplexing
>> > them into one heterogeneous stream which would be pre-sorted in order to
>> > guarantee the global ordering of the events from the two different
>> sources.
>> > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
>> > ProcessFunction[Either[IN1, IN2], OUT] which by construction will
>> process
>> > the data in order and hence produce deterministic results
>> >
>> > Essenti

KafkaSource and Event Time in Message Payload

2022-12-08 Thread Niklas Wilcke
Hi Flink Community,

I have a few questions regarding the new KafkaSource and event time, which I 
wasn't able to answer myself via checking the docs, but please point me to the 
right pages in case I missed something. I'm not entirely whether my knowledge 
entirely holds for the new KafkaSource, because it is rather new to me. Please 
correct me, when I'm making false assumptions.

I make the following assumptions, which you can hopefully confirm.

a1) The KafkaSource by default uses the Kafka timestamp to extract watermarks
a2) There is a mechanism in place to synchronise the consumption from all Kafka 
partitions based on the event time / watermarks (important in case we are 
facing a noticeable consumer lag)

Now lets assume the following scenario

s1) The Kafka timestamp doesn't contain the event time, but only the timestamp 
when the record was written to the topic
s2) The event timestamp is buried deep in the message payload

My question is now what is best practice to extract the event time timestamp? I 
see the following options to handle this problem.

o1) Implement the parsing of the message in the Deserializer and directly 
extract the event time timestamp. This comes with the drawback putting a 
complex parsing logic into the Deserializer.

o2) Let the producer set the Kafka timestamp to the actual event time. This 
comes with the following drawbacks:
  - The Kafka write timestamp is lost
  - The producer (which is in our case only forwarding messages) needs to 
extract the timestamp on its own from the message payload

o3) Leave the Kafka timestamp as it is and create Watermarks downstream in the 
pipeline after the parsing. This comes with the following drawbacks:
  - Scenarios where event time and Kafka timestamp are drifting apart are 
possible and therefor the synchronisation mechanism described in a2 isn't 
really guaranteed to work properly.

Currently we are using o3 and I'm investigating, whether o1 and o2 are better 
alternatives, which doesn't involve sacrificing a2. Can you share any best 
practices here? Is it recommended to always use the Kafka timestamp to hold the 
event time? Are there any other points I missed, which makes one of the options 
preferable?

Thank you for taking the time.

Kind Regards,
Niklas

smime.p7s
Description: S/MIME cryptographic signature


Re: Deterministic co-results

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Salva,

Just to give you further thoughts from another user, I think the "temporal
join" semantics are very critical in this use case, and what you implement
for that may not easily generalize to other cases. Because of that, I'm not
sure if you can really define best practices that apply in general.
Additionally, you also have to take idleness into account, given that using
event-time could leave you in a "frozen" state if you're not receiving
events continuously.

I also doubt you can accurately estimate out-of-orderness in this scenario
due to the particularities of Flink's network stack [1]. Even if you only
have 2 sources and immediately connect them together, parallelism and the
resulting shuffles can change from one execution to the other even if you
don't change the logic at all, because scheduling is also non-deterministic
and the "distribution" of events across different parallel instances of
your sources could vary a lot as well.

I think that others will tell you that you indeed need to find a way to
buffer events for a while, I got the same advice in the past. Focusing very
specifically on what you described (streams for data & control events +
event-time + temporal join), but maybe also assuming you can manage
watermarks in a way that handles idleness without simply freezing the
stream, I once tried a custom operator (not function) to force the
watermarks of 1 stream to wait for the other one -
see PrioritizedWatermarkStreamIntervalJoinOperator in [2]. This still uses
the idea of buffering, it just moves that responsibility to the operator
that's already handling "window data" for the join. Also, it extends an
internal class, so it's very much unofficial, and it's probably too
specific to my use case, but maybe it gives you other ideas to consider.

And one last detail to further exemplify complexity here: when I was
testing my custom operator with event-time simulations in my IDE, I
initially didn't think about the fact that a watermark with Long.MAX_VALUE
is sent at the end of the simulation, which was another source of
non-determinism because sometimes the control stream was processed entirely
(including the max watermark) before a single event from the data stream
went through, which meant that all events from the data stream were
considered late arrivals and silently dropped.

[1] https://flink.apache.org/2019/06/05/flink-network-stack.html
[2] https://lists.apache.org/thread/n79tsqd5y474n5th1wdhkrh7bvlts8bs

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 15:20 Uhr schrieb Salva Alcántara <
salcantara...@gmail.com>:

> Just for adding some extra references:
>
> [5]
> https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
> [6]
> https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
> [7]
> https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260
>
> Salva
>
> On 2022/12/07 18:52:42 Salva Alcántara wrote:
> > It's well-known that Flink does not provide any guarantees on the order
> in
> > which a CoProcessFunction (or one of its multiple variations) processes
> its
> > two inputs [1]. I wonder then what is the current best
> practice/recommended
> > approach for cases where one needs deterministic results in presence of:
> >
> > 1. A control stream
> > 2. An event/data stream
> >
> > Let's consider event-time semantics; both streams have timestamps, and we
> > want to implement "temporal join" semantics where the input events are
> > controlled based on the latest control signals received before them,
> i.e.,
> > the ones "active" when the events happened. For simplicity, let's assume
> > that all events are received in order, so that the only source of
> > non-determinism is the one introduced by the CoProcessFunction itself.
> >
> > I'm considering the following options:
> >
> > 1. Buffer events inside the CoProcessFunction for some time, while saving
> > all the control signals in state (indexed by time)
> > 2. Same as before but doing the pre-buffering of the event/data streams
> > before the CoProcessFunction
> > 3. Similar as before but considering both streams together by
> multiplexing
> > them into one heterogeneous stream which would be pre-sorted in order to
> > guarantee the global ordering of the events from the two different
> sources.
> > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
> > ProcessFunction[Either[IN1, IN2], OUT] which by construction will process
> > the data in order and hence produce deterministic results
> >
> > Essentially, all the strategies amount to introducing a "minimum amount
> of
> > delay" to guarantee the deterministic processing, which brings me to the
> > following question:
> >
> > * How to get an estimate for the out-of-order-ness bound that a
> > CoProcessFunction can introduce? Is that even possible in the first
> place?
> > I guess this mostly

Re: Accessing kafka message key from a KafkaSource

2022-12-08 Thread Noel OConnor
Thanks, I'll try that.

On Wed, Dec 7, 2022 at 7:19 PM Yaroslav Tkachenko  wrote:
>
> Hi Noel,
>
> It's definitely possible. You need to implement a custom 
> KafkaRecordDeserializationSchema: its "deserialize" method gives you a 
> ConsumerRecord as an argument so that you can extract Kafka message key, 
> headers, timestamp, etc.
>
> Then pass that when you create a KafkaSource via "setDeserializer" method.
>
> On Wed, Dec 7, 2022 at 6:14 AM Noel OConnor  wrote:
>>
>> Hi,
>> I'm using a kafka source to read in messages from kafka into a datastream.
>> However I can't seem to access the key of the kafka message in the 
>> datastream.
>> Is this even possible ?
>>
>> cheers
>> Noel


RE: Deterministic co-results

2022-12-08 Thread Salva Alcántara
Just for adding some extra references:

[5]
https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
[6]
https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
[7]
https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260

Salva

On 2022/12/07 18:52:42 Salva Alcántara wrote:
> It's well-known that Flink does not provide any guarantees on the order in
> which a CoProcessFunction (or one of its multiple variations) processes
its
> two inputs [1]. I wonder then what is the current best
practice/recommended
> approach for cases where one needs deterministic results in presence of:
>
> 1. A control stream
> 2. An event/data stream
>
> Let's consider event-time semantics; both streams have timestamps, and we
> want to implement "temporal join" semantics where the input events are
> controlled based on the latest control signals received before them, i.e.,
> the ones "active" when the events happened. For simplicity, let's assume
> that all events are received in order, so that the only source of
> non-determinism is the one introduced by the CoProcessFunction itself.
>
> I'm considering the following options:
>
> 1. Buffer events inside the CoProcessFunction for some time, while saving
> all the control signals in state (indexed by time)
> 2. Same as before but doing the pre-buffering of the event/data streams
> before the CoProcessFunction
> 3. Similar as before but considering both streams together by multiplexing
> them into one heterogeneous stream which would be pre-sorted in order to
> guarantee the global ordering of the events from the two different
sources.
> Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
> ProcessFunction[Either[IN1, IN2], OUT] which by construction will process
> the data in order and hence produce deterministic results
>
> Essentially, all the strategies amount to introducing a "minimum amount of
> delay" to guarantee the deterministic processing, which brings me to the
> following question:
>
> * How to get an estimate for the out-of-order-ness bound that a
> CoProcessFunction can introduce? Is that even possible in the first place?
> I guess this mostly depends on the sources of the two streams and the
> relative ratio of records read. For simplicity we can consider a kafka
> source for both input streams...
>
> On a related note, the "temporal join" seems to be a well-documented and
> solved problem in the SQL API [2-3], but the problem is not even mentioned
> within the docs for the DataStream API. I guess I can copy/adapt the
> corresponding code. E.g., for the "point-in-time" part I can consider a
> TreeMap data structure as used in [3]. I have also come across a new
(still
> WIP) BinarySortedState [4] which would improve performance w.r.t. the
> current RocksDB state backend.
>
> References:
>
> [1]
>
https://stackoverflow.com/questions/51628037/facing-race-condition-in-flink-connected-stream-in-apache-flink
> [2]
>
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join
> [3]
>
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies
> [4]
>
https://www.slideshare.net/FlinkForward/introducing-binarysortedmultimap-a-new-flink-state-primitive-to-boost-your-application-performance
>


Re: Cleanup for high-availability.storageDir

2022-12-08 Thread Matthias Pohl via user
Yes, the wrong button was pushed when replying last time. -.-

Looking into the code once again [1], you're right. It looks like for
"last-state", no job is cancelled but the cluster deployment is just
deleted. I was assuming that the artifacts the documentation about the
JobResultStore resource leak [2] is referring to are the
JobResultStoreEntry files rather than other artifacts (e.g. jobgraphs). But
yeah, if we only delete the deployment, no Flink-internal cleanup is done.

I'm wondering what the reasoning behind that is.

[1]
https://github.com/apache/flink-kubernetes-operator/blob/ea01e294cf1b68d597244d0a11b3c81822a163e7/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L336
[2]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak

On Thu, Dec 8, 2022 at 11:04 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Matthias,
>
> I think you didn't include the mailing list in your response.
>
> According to my experiments, using last-state means the operator simply
> deletes the Flink pods, and I believe that doesn't count as Cancelled, so
> the artifacts for blobs and submitted job graphs are not cleaned up. I
> imagine the same logic Gyula mentioned before applies, namely keep the
> latest one and clean the older ones.
>
> Regards,
> Alexis.
>
> Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl <
> matthias.p...@aiven.io>:
>
>> I see, I confused the Flink-internal recovery with what the Flink
>> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do
>> an upgrade of your job, the operator will cancel the Flink job (I'm
>> assuming now that you use Flink's Application mode rather than Session
>> mode). The operator cancelled your job and shuts down the cluster.
>> Checkpoints are retained and, therefore, can be used as the so-called "last
>> state" when redeploying your job using the same Job ID. In that case, the
>> corresponding jobGraph and other BLOBs should be cleaned up by Flink
>> itself. The checkpoint files are retained, i.e. survive the Flink cluster
>> shutdown.
>>
>> When redeploying the Flink cluster with the (updated) job, a new JobGraph
>> file is created by Flink internally. BLOBs are recreated as well. New
>> checkpoints are going to be created and old ones (that are not needed
>> anymore) are cleaned up.
>>
>> Just to recap what I said before (to make it more explicit to
>> differentiate what the k8s operator does and what Flink does internally):
>> Removing the artifacts you were talking about in your previous post would
>> harm Flink's internal recovery mechanism. That's probably not what you want.
>>
>> @Gyula: Please correct me if I misunderstood something here.
>>
>> I hope that helped.
>> Matthias
>>
>> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> I see, thanks for the details.
>>>
>>> I do mean replacing the job without stopping it terminally.
>>> Specifically, I mean updating the container image with one that contains
>>> an updated job jar. Naturally, the new version must not break state
>>> compatibility, but as long as that is fulfilled, the job should be able to
>>> use the last checkpoint as starting point. It's my understanding that this
>>> is how the Kubernetes operator's "last-state" upgrade mode works [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
>>> matthias.p...@aiven.io>:
>>>
 > - job_name/submittedJobGraphX
 submittedJobGraph* is the persisted JobGraph that would be picked up in
 case of a failover. Deleting this file would result in Flink's failure
 recovery not working properly anymore if the job is still executed but
 needs to be restarted because the actual job definition is gone.

 > completedCheckpointXYZ
 This is the persisted CompletedCheckpoint with a reference to the
 actual Checkpoint directory. Deleting this file would be problematic if the
 state recovery relies in some way on this specific checkpoint. The HA data
 relies on this file to be present. Failover only fails if there's no newer
 checkpoint or the HA data still refers to this checkpoint in some way.

 > - job_name/blob/job_uuid/blob_...
 Artifacts of the BlobServer containing runtime artifacts of the jobs
 (e.g. logs, libraries, ...)

 In general, you don't want to clean HA artifacts if the job hasn't
 reached a terminal state, yet, as it harms Flink's ability to recover the
 job. Additionally, these files are connected with the HA backend, i.e. the
 file path is stored in the HA backend. Removing the artifacts will likely
 result in metadata becoming invalid.
>

Re: Query about flink job manager dashboard

2022-12-08 Thread naga sudhakar
Is it possible to disable to dashboard port running, but still api running
on different port? Let us know if we can configure this?

On Mon, 5 Dec, 2022, 6:08 AM naga sudhakar,  wrote:

> Any suggestions for these apis to work after applying these configuration?
> Basically I suspect webupload directory cobfif is not taken also when these
> were set to false.
>
> web.submit.enable: Enables uploading and starting jobs through the Flink
> UI (true by default). Please note that even when this is disabled, session
> clusters still accept jobs through REST requests (HTTP calls). This flag
> only guards the feature to upload jobs in the UI.
> web.cancel.enable: Enables canceling jobs through the Flink UI (true by
> default). Please note that even when this is disabled, session clusters
> still cancel jobs through REST requests (HTTP calls). This flag only guards
> the feature to cancel jobs in the UI.
> web.upload.dir: The directory where to store uploaded jobs. Only used when
> web.submit.enable is true
>
> On Wed, 30 Nov, 2022, 10:46 AM naga sudhakar, 
> wrote:
>
>> After disabling the cancel, submit flags facing issues with below api
>> calls.
>>
>> 1) /jars giving 404
>> 2) /jars/upload
>> 3) /jars/{jarid}/run
>>
>> Is there any config changes needed to have these apis work?
>>
>>
>> On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar, 
>> wrote:
>>
>>> Hi,
>>> We are able to disable this cancela nd upload otpion in ui.
>>> But this is having issues with endpoints for below.
>>> Get call for /jars to list all uploaded jars and post call
>>> /jars/{jarid}/run are giving 404 after disabling the two flags.
>>> Is the process of uploading jars and running a jar with specific id
>>> changes after this change?
>>>
>>> Please suggest.
>>>
>>> Thanks & Regards,
>>> Nagasudhakar
>>>
>>> On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
>>> wrote:
>>>
 Hi,

 1) No, that's currently not possible.
 2) You could consider disabling to disallow uploading new JARs and/or
 cancelling jobs from the UI. See
 https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui

 Best regards,

 Martijn

 On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
 wrote:

> Hi Team,
>> Greetings!!!
>> I am a software developer using apache flink and deploying flink jobs
>> using the same. I have two queries about flink job manager dashboard. Can
>> you please help with below?
>>
>> 1) is it possible to add login mechanism for the flink job manager
>> dash board and have a role based mechanism for viewing running jobs,
>> cancelling jobs, adding the jobs?
>> 2) is it possible to disable to dash bord display but use api to do
>> the same operations using API?
>>
>>
>> Thanks,
>> Nagasudhakar.
>>
>


Re: Cleanup for high-availability.storageDir

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Matthias,

I think you didn't include the mailing list in your response.

According to my experiments, using last-state means the operator simply
deletes the Flink pods, and I believe that doesn't count as Cancelled, so
the artifacts for blobs and submitted job graphs are not cleaned up. I
imagine the same logic Gyula mentioned before applies, namely keep the
latest one and clean the older ones.

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> I see, I confused the Flink-internal recovery with what the Flink
> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do
> an upgrade of your job, the operator will cancel the Flink job (I'm
> assuming now that you use Flink's Application mode rather than Session
> mode). The operator cancelled your job and shuts down the cluster.
> Checkpoints are retained and, therefore, can be used as the so-called "last
> state" when redeploying your job using the same Job ID. In that case, the
> corresponding jobGraph and other BLOBs should be cleaned up by Flink
> itself. The checkpoint files are retained, i.e. survive the Flink cluster
> shutdown.
>
> When redeploying the Flink cluster with the (updated) job, a new JobGraph
> file is created by Flink internally. BLOBs are recreated as well. New
> checkpoints are going to be created and old ones (that are not needed
> anymore) are cleaned up.
>
> Just to recap what I said before (to make it more explicit to
> differentiate what the k8s operator does and what Flink does internally):
> Removing the artifacts you were talking about in your previous post would
> harm Flink's internal recovery mechanism. That's probably not what you want.
>
> @Gyula: Please correct me if I misunderstood something here.
>
> I hope that helped.
> Matthias
>
> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> I see, thanks for the details.
>>
>> I do mean replacing the job without stopping it terminally. Specifically,
>> I mean updating the container image with one that contains an updated job
>> jar. Naturally, the new version must not break state compatibility, but as
>> long as that is fulfilled, the job should be able to use the last
>> checkpoint as starting point. It's my understanding that this is how the
>> Kubernetes operator's "last-state" upgrade mode works [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
>> matthias.p...@aiven.io>:
>>
>>> > - job_name/submittedJobGraphX
>>> submittedJobGraph* is the persisted JobGraph that would be picked up in
>>> case of a failover. Deleting this file would result in Flink's failure
>>> recovery not working properly anymore if the job is still executed but
>>> needs to be restarted because the actual job definition is gone.
>>>
>>> > completedCheckpointXYZ
>>> This is the persisted CompletedCheckpoint with a reference to the actual
>>> Checkpoint directory. Deleting this file would be problematic if the state
>>> recovery relies in some way on this specific checkpoint. The HA data relies
>>> on this file to be present. Failover only fails if there's no newer
>>> checkpoint or the HA data still refers to this checkpoint in some way.
>>>
>>> > - job_name/blob/job_uuid/blob_...
>>> Artifacts of the BlobServer containing runtime artifacts of the jobs
>>> (e.g. logs, libraries, ...)
>>>
>>> In general, you don't want to clean HA artifacts if the job hasn't
>>> reached a terminal state, yet, as it harms Flink's ability to recover the
>>> job. Additionally, these files are connected with the HA backend, i.e. the
>>> file path is stored in the HA backend. Removing the artifacts will likely
>>> result in metadata becoming invalid.
>>>
>>> What do you mean with "testing updates *without* savepoints"? Are you
>>> referring to replacing the job's business logic without stopping the job?
>>>
>>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
 Hi Matthias,

 Then the explanation is likely that the job has not reached a terminal
 state. I was testing updates *without* savepoints (but with HA), so I guess
 that never triggers automatic cleanup.

 Since, in my case, the job will theoretically never reach a terminal
 state with this configuration, would it cause issues if I clean the
 artifacts manually?

 *And for completeness, I also see an artifact called
 completedCheckpointXYZ which is updated over time, and I imagine that
 should never be removed.

 Regards,
 Alexis.

 Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl <
 matthias.p...@aiven.io>:

> Flink should already take care of cleaning the artifacts you
> mentioned. Flink 1.15+ even i

Re: K8S operator support status

2022-12-08 Thread Márton Balassi
Hi Ruibin,

1. Standalone is indeed supported since 1.2 (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#cluster-deployment-modes),
I will correct the Known issues, that is just an oversight that we left it
there - thanks for reporting.
2. Reactive mode is undocumented explicitly specifically for the operator,
but to my knowledge it works as expected.

On Thu, Dec 8, 2022 at 10:16 AM Ruibin Xing  wrote:

> Hi community,
>
> I'm looking into the Flink K8s operator documents, and I'm a bit confused
> about the following:
>
>1.
>
>The latest document (
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/)
>says that Kubernetes standalone mode is not supported yet. However, the
>announcement for Flink K8s operator 1.2.0 in
>
> https://flink.apache.org/news/2022/10/07/release-kubernetes-operator-1.2.0.html
>states that standalone deployment is now supported. I think this may be a
>document error?
>2.
>
>The reactive mode. Does the k8s operator support the reactive mode
>now? Is there any documentation available for it?
>
> Thanks for your insights!
>


K8S operator support status

2022-12-08 Thread Ruibin Xing
Hi community,

I'm looking into the Flink K8s operator documents, and I'm a bit confused
about the following:

   1.

   The latest document (
   
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/)
   says that Kubernetes standalone mode is not supported yet. However, the
   announcement for Flink K8s operator 1.2.0 in
   
https://flink.apache.org/news/2022/10/07/release-kubernetes-operator-1.2.0.html
   states that standalone deployment is now supported. I think this may be a
   document error?
   2.

   The reactive mode. Does the k8s operator support the reactive mode now?
   Is there any documentation available for it?

Thanks for your insights!