Re: Flink Kubernetes Operator 1.8.0 CRDs

2024-05-28 Thread Alexis Sarda-Espinosa
Hello,

I've also noticed this in our Argo CD setup. Since priority=0 is the
default, Kubernetes accepts it but doesn't store it in the actual resource,
I'm guessing it's like a mutating admission hook that comes out of the box.
The "priority" property can be safely removed from the CRDs.

Regards,
Alexis.

Am Do., 9. Mai 2024 um 22:03 Uhr schrieb Prasad, Neil <
neil.pra...@activision.com>:

> Sorry, let me explain. I currently have the operator deployed and managed
> via ArgoCD. The CRDs I separated out into a different chart so I can do
> upgrades on them. I am working on upgrading from version 1.7.0 to 1.8.0
> using ArgoCD. What I’ve done is replace the CRDs in the separate chart and
> made sure ArgoCD runs a replace action against them. I then bumped the
> chart version to 1.8.0 like so:
>
> dependencies:
>
> - name: flink-kubernetes-operator
>
> version: 1.8.0
>
> repository:
> https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
>
>
>
> With those changes in place, ArgoCD still shows a diff specifically around
> the additionalPrinterColumns section. The newer CRD versions have a line of
> “priority: 0” under both Job Status and Lifecycle State but this is never
> reflected in the cluster, either by replacing via ArgoCD or by hand. This
> is the issue that I’m trying to resolve, how do I make sure this matches in
> cluster when it doesn’t want to apply? Upgrading from 1.6.1 to 1.7.0 has
> the same issue where that line (priority: 0) isn’t reflected.
>
>
>
> Now this doesn’t hurt the functionality we need but if we want to make
> sure the CRDs are the same that are bundled in a release, then it’ll always
> show a diff.
>
>
>
> *From: *Márton Balassi 
> *Date: *Thursday, May 9, 2024 at 3:50 PM
> *To: *Prasad, Neil 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Flink Kubernetes Operator 1.8.0 CRDs
>
> Hi, What do you mean exactly by cannot be applied or replaced? What
> exactly is the issue? Are you installing fresh or trying to upgrade from a
> previous version? If the latter please follow this: https: //nightlies.
> apache. org/flink/flink-kubernetes-operator-docs-release-1. 
> 8/docs/operations/upgrade/#2-upgrading-the-crdOn
>
>
> ZjQcmQRYFpfptBannerStart
>
> *This Message Is From an Untrusted Sender *
>
> You have not previously corresponded with this sender.
>
>
>
> ZjQcmQRYFpfptBannerEnd
>
> Hi,
>
>
>
> What do you mean exactly by cannot be applied or replaced? What exactly is
> the issue?
>
>
>
> Are you installing fresh or trying to upgrade from a previous version? If
> the latter please follow this:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/upgrade/#2-upgrading-the-crd
> 
>
>
>
> On Thu, May 9, 2024 at 9:18 PM Prasad, Neil 
> wrote:
>
> I am writing to report an issue with the Flink Kubernetes Operator version
> 1.8.0. The CRD is unable to be applied or replaced in minikube or GKE.
> However, the CRD works on version 1.7.0 of the operator. I thought it would
> be helpful to bring this issue to the attention of the community and get
> some help in case someone has run into this issue before .
>
>
>
> Thank you for your attention to this matter.
>
>


Re: Need help in understanding PojoSerializer

2024-03-20 Thread Alexis Sarda-Espinosa
Hi Sachin,

Check the last few comments I wrote in this thread:

https://lists.apache.org/thread/l71d1cqo9xv8rsw0gfjo19kb1pct2xj1

Regards,
Alexis.

On Wed, 20 Mar 2024, 18:51 Sachin Mittal,  wrote:

> Hi,
> I saw the post but I did not understand how I would configure these fields
> to use those serializers. (I can change the set type to a list type for
> now).
> As per the docs I see that we can annotate fields with @TypeInfo
>
> But what I did not get is how using this annotation I can use ListSerializer
> and *MapSerializer.*
>
> Thanks
> Sachin
>
>
> On Wed, Mar 20, 2024 at 10:47 PM Ken Krugler 
> wrote:
>
>> Flink doesn’t have built-in support for serializing Sets.
>>
>> See this (stale) issue about the same:
>> https://issues.apache.org/jira/browse/FLINK-16729
>>
>> You could create a custom serializer for sets, see
>> https://stackoverflow.com/questions/59800851/flink-serialization-of-java-util-list-and-java-util-map
>> and
>> https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/ListSerializer.html
>> for details on how this was done for a list, but it’s not trivial.
>>
>> Or as a hack, use a Map and the existing support for map
>> serialization via
>> https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/MapSerializer.html
>>
>> — Ken
>>
>>
>> On Mar 20, 2024, at 10:04 AM, Sachin Mittal  wrote:
>>
>> Hi,
>> I have a Pojo class like this
>>
>> public class A {
>>
>> public String str;
>>
>> public Set aSet;
>>
>> public Map dMap;
>>
>> }
>>
>> However when I start the flink program I get this message:
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
>> dMap will be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
>> aSet will be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>>
>> Also in my code I have added
>>
>> env.getConfig().disableGenericTypes();
>>
>> So I don't understand when I use Maps and Sets of primitive types why is 
>> Flink not
>>
>> able to use PojoSerializer for these fields and even when I have disabled 
>> generics types.
>>
>> why I am getting message that it will be processed as GenericType?
>>
>>
>> Any help in understanding what I need to do to ensure all the fields of my 
>> object are handled using PojoSerializer.
>>
>>
>> Thanks
>>
>> Sachin
>>
>>
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink & Pinot
>>
>>
>>
>>


Re: Impact of RocksDB backend on the Java heap

2024-02-19 Thread Alexis Sarda-Espinosa
Hi Zakelly,

Yeah that makes sense to me, I was just curious about whether reading could
be a bottleneck or not, but I imagine user-specific logic would be better
than a generic cache from Flink that might habe a low hit rate.

Thanks again,
Alexis.

On Mon, 19 Feb 2024, 07:29 Zakelly Lan,  wrote:

> Hi Alexis,
>
> Assuming the bulk load for a batch of sequential keys performs better than
> accessing them one by one, the main problem comes to do we really need to
> access all the keys that were bulk-loaded to cache before. In other words,
> cache hit rate is the key issue. If the rate is high, even though a single
> key-value is large and loading them is slow, it is still worth it to load
> them in advance. In case of timer and iteration (which I missed in last
> mail), the cache is almost guaranteed to hit. Thus a cache is introduced to
> enhance the performance here.
>
>
> Best,
> Zakelly
>
> On Sun, Feb 18, 2024 at 7:42 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Zakelly,
>>
>> thanks for the information, that's interesting. Would you say that
>> reading a subset from RocksDB is fast enough to be pretty much negligible,
>> or could it be a bottleneck if the state of each key is "large"? Again
>> assuming the number of distinct partition keys is large.
>>
>> Regards,
>> Alexis.
>>
>> On Sun, 18 Feb 2024, 05:02 Zakelly Lan,  wrote:
>>
>>> Hi Alexis,
>>>
>>> Flink does need some heap memory to bridge requests to rocksdb and
>>> gather the results. In most cases, the memory is discarded immediately
>>> (eventually collected by GC). In case of timers, flink do cache a limited
>>> subset of key-values in heap to improve performance.
>>>
>>> In general you don't need to consider its heap consumption since it is
>>> minor.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Fri, Feb 16, 2024 at 4:43 AM Asimansu Bera 
>>> wrote:
>>>
>>>> Hello Alexis,
>>>>
>>>> I don't think data in RocksDB resides in JVM even with function calls.
>>>>
>>>> For more details, check the link below:
>>>>
>>>> https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
>>>>
>>>> RocksDB has three main components - memtable, sstfile and WAL(not used
>>>> in Flink as Flink uses checkpointing). When TM starts with statebackend as
>>>> RocksDB,TM has its own RocksDB instance and the state is managed as column
>>>> Family by that TM. Any changes of state go into memtable --> sst-->
>>>> persistent store. When read, data goes to the buffers and cache of RocksDB.
>>>>
>>>> In the case of RocksDB as state backend, JVM still holds threads stack
>>>> as for high degree of parallelism, there are many
>>>> stacks maintaining separate thread information.
>>>>
>>>> Hope this helps!!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 15, 2024 at 11:21 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hi Asimansu
>>>>>
>>>>> The memory RocksDB manages is outside the JVM, yes, but the mentioned
>>>>> subsets must be bridged to the JVM somehow so that the data can be exposed
>>>>> to the functions running inside Flink, no?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>
>>>>> On Thu, 15 Feb 2024, 14:06 Asimansu Bera, 
>>>>> wrote:
>>>>>
>>>>>> Hello Alexis,
>>>>>>
>>>>>> RocksDB resides off-heap and outside of JVM. The small subset of data
>>>>>> ends up on the off-heap in the memory.
>>>>>>
>>>>>> For more details, check the following link:
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>>>>>>
>>>>>> I hope this addresses your inquiry.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
>>>>>> sarda.espin...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Most info regarding RocksDB memory for Flink focuses on what's
>>>>>>> needed independently of the JVM (although the Flink process configures 
>>>>>>> its
>>>>>>> limits and so on). I'm wondering if there are additional special
>>>>>>> considerations with regards to the JVM heap in the following scenario.
>>>>>>>
>>>>>>> Assuming a key used to partition a Flink stream and its state has a
>>>>>>> high cardinality, but that the state of each key is small, when Flink
>>>>>>> prepares the state to expose to a user function during a call (with a 
>>>>>>> given
>>>>>>> partition key), I guess it loads only the required subset from RocksDB, 
>>>>>>> but
>>>>>>> does this small subset end (temporarily) up on the JVM heap? And if it
>>>>>>> does, does it stay "cached" in the JVM for some time or is it 
>>>>>>> immediately
>>>>>>> discarded after the user function completes?
>>>>>>>
>>>>>>> Maybe this isn't even under Flink's control, but I'm curious.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Alexis.
>>>>>>>
>>>>>>


Re: Impact of RocksDB backend on the Java heap

2024-02-18 Thread Alexis Sarda-Espinosa
Hi Zakelly,

thanks for the information, that's interesting. Would you say that reading
a subset from RocksDB is fast enough to be pretty much negligible, or could
it be a bottleneck if the state of each key is "large"? Again assuming the
number of distinct partition keys is large.

Regards,
Alexis.

On Sun, 18 Feb 2024, 05:02 Zakelly Lan,  wrote:

> Hi Alexis,
>
> Flink does need some heap memory to bridge requests to rocksdb and gather
> the results. In most cases, the memory is discarded immediately (eventually
> collected by GC). In case of timers, flink do cache a limited subset of
> key-values in heap to improve performance.
>
> In general you don't need to consider its heap consumption since it is
> minor.
>
>
> Best,
> Zakelly
>
> On Fri, Feb 16, 2024 at 4:43 AM Asimansu Bera 
> wrote:
>
>> Hello Alexis,
>>
>> I don't think data in RocksDB resides in JVM even with function calls.
>>
>> For more details, check the link below:
>>
>> https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
>>
>> RocksDB has three main components - memtable, sstfile and WAL(not used in
>> Flink as Flink uses checkpointing). When TM starts with statebackend as
>> RocksDB,TM has its own RocksDB instance and the state is managed as column
>> Family by that TM. Any changes of state go into memtable --> sst-->
>> persistent store. When read, data goes to the buffers and cache of RocksDB.
>>
>> In the case of RocksDB as state backend, JVM still holds threads stack as
>> for high degree of parallelism, there are many stacks maintaining separate
>> thread information.
>>
>> Hope this helps!!
>>
>>
>>
>>
>>
>> On Thu, Feb 15, 2024 at 11:21 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hi Asimansu
>>>
>>> The memory RocksDB manages is outside the JVM, yes, but the mentioned
>>> subsets must be bridged to the JVM somehow so that the data can be exposed
>>> to the functions running inside Flink, no?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, 15 Feb 2024, 14:06 Asimansu Bera, 
>>> wrote:
>>>
>>>> Hello Alexis,
>>>>
>>>> RocksDB resides off-heap and outside of JVM. The small subset of data
>>>> ends up on the off-heap in the memory.
>>>>
>>>> For more details, check the following link:
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>>>>
>>>> I hope this addresses your inquiry.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Most info regarding RocksDB memory for Flink focuses on what's needed
>>>>> independently of the JVM (although the Flink process configures its limits
>>>>> and so on). I'm wondering if there are additional special considerations
>>>>> with regards to the JVM heap in the following scenario.
>>>>>
>>>>> Assuming a key used to partition a Flink stream and its state has a
>>>>> high cardinality, but that the state of each key is small, when Flink
>>>>> prepares the state to expose to a user function during a call (with a 
>>>>> given
>>>>> partition key), I guess it loads only the required subset from RocksDB, 
>>>>> but
>>>>> does this small subset end (temporarily) up on the JVM heap? And if it
>>>>> does, does it stay "cached" in the JVM for some time or is it immediately
>>>>> discarded after the user function completes?
>>>>>
>>>>> Maybe this isn't even under Flink's control, but I'm curious.
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>


Re: Impact of RocksDB backend on the Java heap

2024-02-15 Thread Alexis Sarda-Espinosa
Hi Asimansu

The memory RocksDB manages is outside the JVM, yes, but the mentioned
subsets must be bridged to the JVM somehow so that the data can be exposed
to the functions running inside Flink, no?

Regards,
Alexis.


On Thu, 15 Feb 2024, 14:06 Asimansu Bera,  wrote:

> Hello Alexis,
>
> RocksDB resides off-heap and outside of JVM. The small subset of data ends
> up on the off-heap in the memory.
>
> For more details, check the following link:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>
> I hope this addresses your inquiry.
>
>
>
>
> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Most info regarding RocksDB memory for Flink focuses on what's needed
>> independently of the JVM (although the Flink process configures its limits
>> and so on). I'm wondering if there are additional special considerations
>> with regards to the JVM heap in the following scenario.
>>
>> Assuming a key used to partition a Flink stream and its state has a high
>> cardinality, but that the state of each key is small, when Flink prepares
>> the state to expose to a user function during a call (with a given
>> partition key), I guess it loads only the required subset from RocksDB, but
>> does this small subset end (temporarily) up on the JVM heap? And if it
>> does, does it stay "cached" in the JVM for some time or is it immediately
>> discarded after the user function completes?
>>
>> Maybe this isn't even under Flink's control, but I'm curious.
>>
>> Regards,
>> Alexis.
>>
>


Impact of RocksDB backend on the Java heap

2024-02-15 Thread Alexis Sarda-Espinosa
Hello,

Most info regarding RocksDB memory for Flink focuses on what's needed
independently of the JVM (although the Flink process configures its limits
and so on). I'm wondering if there are additional special considerations
with regards to the JVM heap in the following scenario.

Assuming a key used to partition a Flink stream and its state has a high
cardinality, but that the state of each key is small, when Flink prepares
the state to expose to a user function during a call (with a given
partition key), I guess it loads only the required subset from RocksDB, but
does this small subset end (temporarily) up on the JVM heap? And if it
does, does it stay "cached" in the JVM for some time or is it immediately
discarded after the user function completes?

Maybe this isn't even under Flink's control, but I'm curious.

Regards,
Alexis.


Watermark alignment without idleness

2024-02-06 Thread Alexis Sarda-Espinosa
Hello,

I was reading through the comments in [1] and it seems that enabling
watermark alignment implicitly activates some idleness logic "if the source
waits for alignment for a long time" (even if withIdleness is not called
explicitly during the creation of WatermarkStrategy). Is this time somehow
configurable? I believe this isn't documented.

[1] https://issues.apache.org/jira/browse/FLINK-32496

Regards,
Alexis.


Re: Request to provide sample codes on Data stream using flink, Spring Boot & kafka

2024-02-06 Thread Alexis Sarda-Espinosa
Hello,

check this thread from some months ago, but keep in mind that it's not
really officially supported by Flink itself:

https://lists.apache.org/thread/l0pgm9o2vdywffzdmbh9kh7xorhfvj40

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 12:23 Uhr schrieb Fidea Lidea <
lideafidea...@gmail.com>:

> Hi Team,
>
> I request you to provide sample codes on data streaming using flink, kafka
> and spring boot.
>
> Awaiting your response.
>
> Thanks & Regards
> Nida Shaikh
>


Re: Idleness not working if watermark alignment is used

2024-02-06 Thread Alexis Sarda-Espinosa
Hi Matthias,

I think I understand the implications of idleness. In my case I really do
need it since even in the production environment one of the Kafka topics
will receive messages only sporadically.

With regards to the code, I have very limited understanding of Flink
internals, but that part I linked seems to indicate that, if a stream is
idle, the log should indicate a hard-coded maxAllowedWatermark equal to
Long.MAX_VALUE, that's why I thought the source isn't really considered as
idle.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 11:46 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
> Yes, I guess so, while not utterly acquainted with that part of the code.
>
> Apparently the SourceCoordinator cannot come up with a proper watermark
> time, if watermarking is turned off (idle mode of stream), and then it
> deducts watermark time from the remaining non-idle sources.
>
> It’s consistent with how idling-state of data streams is designed.
>
> However it still remains the notion of that one needs to compensate for
> .withIdleness(…) if correctness is any consideration.
>
> Using .withIdleness(…) is IMHO only justified in rare cases where
> implications are fully understood.
>
>
>
> If a source is not configured with .withIdleness(…) and becomes factually
> idle, all window aggregations or stateful stream joins stall until that
> source becomes active again (= added latency)
>
>
>
> Thias
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Tuesday, February 6, 2024 9:48 AM
> *To:* Schwalbe Matthias 
> *Cc:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Matthias,
>
>
>
> thanks for looking at this. Would you then say this comment in the source
> code is not really valid?
>
>
> https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181
>
>
>
> That's where the log I was looking at is created.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-

Re: Idleness not working if watermark alignment is used

2024-02-06 Thread Alexis Sarda-Espinosa
Hi Matthias,

thanks for looking at this. Would you then say this comment in the source
code is not really valid?
https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181

That's where the log I was looking at is created.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: Idleness not working if watermark alignment is used

2024-02-05 Thread Alexis Sarda-Espinosa
Ah and I forgot to mention, this is with Flink 1.18.1

Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hello,
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
> where maxAllowedWatermark grows all the time.
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
> Regards,
> Alexis.
>
>


Idleness not working if watermark alignment is used

2024-02-05 Thread Alexis Sarda-Espinosa
Hello,

I have 2 Kafka sources that are configured with a watermark strategy
instantiated like this:

WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
.withIdleness(idleTimeout) // 5 seconds currently
.withWatermarkAlignment(alignmentGroup,
maxAllowedWatermarkDrift, Duration.ofSeconds(1L))

The alignment group is the same for both, but each one consumes from a
different topic. During a test, I ensured that one of the topics didn't
receive any messages, but when I check the logs I see multiple entries like
this:

Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.

where maxAllowedWatermark grows all the time.

Maybe my understanding is wrong, but I think this means the source is never
marked as idle even though it didn't receive any new messages in the Kafka
topic?

Regards,
Alexis.


Watermark alignment with different allowed drifts

2024-02-05 Thread Alexis Sarda-Espinosa
Hello,

is the behavior for this configuration well defined? Assigning two
different (Kafka) sources to the same alignment group but configuring
different max allowed drift in each  one.

Regards,
Alexis.


Re: Flink KafkaProducer Failed Transaction Stalling the whole flow

2023-12-18 Thread Alexis Sarda-Espinosa
Hi Dominik,

Sounds like it could be this?
https://issues.apache.org/jira/browse/FLINK-28060

It doesn't mention transactions but I'd guess it could be the same
mechanism.

Regards,
Alexis.

On Mon, 18 Dec 2023, 07:51 Dominik Wosiński,  wrote:

> Hey,
> I've got a question regarding the transaction failures in EXACTLY_ONCE
> flow with Flink 1.15.3 with Confluent Cloud Kafka.
>
> The case is that there is a FlinkKafkaProducer in EXACTLY_ONCE setup with
> default *transaction.timeout.ms  *of
> 15min.
>
> During the processing the job had some issues that caused checkpoint to
> timeout, that in turn caused the transaction issues, which caused
> transaction to fail with the following logs:
> Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@5d0d5082)
> because its producer is already fenced. This means that you either have a
> different producer with the same 'transactional.id' (this is unlikely
> with the 'KafkaSink' as all generated ids are unique and shouldn't be
> reused) or recovery took longer than 'transaction.timeout.ms' (90ms).
> In both cases this most likely signals data loss, please consult the Flink
> documentation for more details.
> Up to this point everything is pretty clear. After that however, the job
> continued to work normally but every single transaction was failing with:
> Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@5a924600)
> because it's in an invalid state. Most likely the transaction has been
> aborted for some reason. Please check the Kafka logs for more details.
> Which effectively stalls all downstream processing because no transaction
> would be ever commited.
>
> I've read through the docs and understand that this is kind of a known
> issue due to the fact that Kafka doesn't effectively support 2PC, but why
> doesn't that cause the failure and restart of the whole job? Currently, the
> job will process everything normally and hides the issue until it has grown
> catastrophically.
>
> Thanks in advance,
> Cheers.
>


Re: Avoid dynamic classloading in native mode with Kubernetes Operator

2023-11-20 Thread Alexis Sarda-Espinosa
Hi Trystan, I'm actually not very familiar with the operator's internals,
but I'd guess that limitation is in Flink itself - application mode is a
feature from core Flink, the operator just configures it based on the CRDs
it defines. Maybe one of the maintainers can confirm.

Regards,
Alexis.

On Mon, 20 Nov 2023, 19:25 Trystan,  wrote:

> Thanks Alexis, I can give that a try. However, that seems less than ideal
> from the user's perspective.
>
> Is there a technical reason why the operator can't support this
> combination of modes? I'd really like to just let the system do its thing
> rather than build a complicated two-jar approach.
>
> Thanks,
> Trystan
>
> On Fri, Nov 17, 2023 at 12:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Trystan,
>>
>> I imagine you can create 2 jars, one should only have a class with the
>> main method, and the other should be a fat jar with everything else for
>> your job. If you create a custom image where your fat jar is placed under
>> /opt/flink/lib/ then I think it would "just work" when specifying the
>> main-method jar in jarURI.
>>
>> Nevertheless, even though Flink shadows a lot of the libraries they use
>> internally, I suppose you could still end up with dependency conflicts, so
>> you would probably have some added complexity managing what's bundled in
>> your fat jar.
>>
>> Regards,
>> Alexis.
>>
>> Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan :
>>
>>> Is it possible to avoid dynamic classloading when using the operator
>>> with a native kubernetes application deployment?
>>>
>>> If I put the job jar into /opt/flinklib, then there are two possible
>>> outcomes:
>>>
>>>1. If I point jarURI to the jar, I get linkage errors (presumably:
>>>the class have already been loaded by the AppClassLoader and the
>>>FlinkUserCodeClassLoader).
>>>2. If I do not include jarURI the operator pods encounter a
>>>NullPointerException. The docs state this is optional, but appears to 
>>> only
>>>pertain to standalone mode.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional
>>> jarURI (apparently only for standalone deployments).
>>>
>>> Are there any additional configurations (configs, jar locations, etc)
>>> that are needed to avoid dynamic classloading in this case?
>>>
>>


Re: Avoid dynamic classloading in native mode with Kubernetes Operator

2023-11-17 Thread Alexis Sarda-Espinosa
Hi Trystan,

I imagine you can create 2 jars, one should only have a class with the main
method, and the other should be a fat jar with everything else for your
job. If you create a custom image where your fat jar is placed under
/opt/flink/lib/ then I think it would "just work" when specifying the
main-method jar in jarURI.

Nevertheless, even though Flink shadows a lot of the libraries they use
internally, I suppose you could still end up with dependency conflicts, so
you would probably have some added complexity managing what's bundled in
your fat jar.

Regards,
Alexis.

Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan :

> Is it possible to avoid dynamic classloading when using the operator with
> a native kubernetes application deployment?
>
> If I put the job jar into /opt/flinklib, then there are two possible
> outcomes:
>
>1. If I point jarURI to the jar, I get linkage errors (presumably: the
>class have already been loaded by the AppClassLoader and the
>FlinkUserCodeClassLoader).
>2. If I do not include jarURI the operator pods encounter a
>NullPointerException. The docs state this is optional, but appears to only
>pertain to standalone mode.
>
> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional
> jarURI (apparently only for standalone deployments).
>
> Are there any additional configurations (configs, jar locations, etc) that
> are needed to avoid dynamic classloading in this case?
>


Re: dependency error with latest Kafka connector

2023-11-14 Thread Alexis Sarda-Espinosa
Isn't it expected that it points to 1.17? That version of the Kafka
connector is meant to be compatible with both Flink 1.17 and 1.18, right?
So the older version should be specified so that the consumer can decide
which Flink version to compile against, otherwise the build tool could
silently update the compile-only dependencies, no?

Regards,
Alexis.

Am Di., 14. Nov. 2023 um 11:54 Uhr schrieb Alexey Novakov via user <
user@flink.apache.org>:

> Hi Günterh,
>
> It looks like a problem with the Kafka connector release.
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.0.1-1.18
> Compile dependencies are still pointing to Flink 1.17.
>
> Release person is already contacted about this or will be contacted soon.
>
> Best regards,
> Alexey
>
> On Mon, Nov 13, 2023 at 10:42 PM guenterh.lists 
> wrote:
>
>> Hello
>>
>> I'm getting a dependency error when using the latest Kafka connector in
>> a Scala project.
>>
>> Using the 1.17.1 Kafka connector compilation is ok.
>>
>> With
>>
>> "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"
>>
>> I get
>> [error] (update) sbt.librarymanagement.ResolveException: Error
>> downloading org.apache.flink:flink-connector-base:
>> [error]   Not found
>> [error]   Not found
>> [error]   not found:
>>
>> /home/swissbib/.ivy2/local/org.apache.flink/flink-connector-base/ivys/ivy.xml
>> [error]   not found:
>>
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base//flink-connector-base-.pom
>>
>> Seems Maven packaging is not correct.
>>
>> My sbt build file:
>>
>> ThisBuild / scalaVersion := "3.3.0"
>> val flinkVersion = "1.18.0"
>> val postgresVersion = "42.2.2"
>>
>> lazy val root = (project in file(".")).settings(
>>name := "flink-scala-proj",
>>libraryDependencies ++= Seq(
>>  "org.flinkextended" %% "flink-scala-api" % "1.17.1_1.1.0",
>>  "org.apache.flink" % "flink-clients" % flinkVersion % Provided,
>>  "org.apache.flink" % "flink-connector-files" % flinkVersion %
>> Provided,
>>
>>"org.apache.flink" % "flink-connector-kafka" % "1.17.1",
>>//"org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18",
>>
>>//"org.apache.flink" % "flink-connector-jdbc" % "3.1.1-1.17",
>>//"org.postgresql" % "postgresql" % postgresVersion,
>>"org.apache.flink" % "flink-connector-files" % flinkVersion % Provided,
>>//"org.apache.flink" % "flink-connector-base" % flinkVersion % Provided
>>)
>> )
>>
>>
>>
>> Thanks!
>>
>> --
>> Günter Hipler
>> https://openbiblio.social/@vog61
>> https://twitter.com/vog61
>>
>>


Re: Continuous errors with Azure ABFSS

2023-11-10 Thread Alexis Sarda-Espinosa
After enabling some more logging for the storage account, I figured out the
errors correspond to 404 PathNotFound responses. My guess is the file
system checks the status of a path to see if it exists or not before
trying to write to it, in this case for _metadata files from each new
checkpoint. Seems like normal operations, so it's just unfortunate the
Azure API exposes that as continuous ClientOtherError metrics.

Regards,
Alexis.

Am Fr., 6. Okt. 2023 um 08:10 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Yes, that also works correctly, at least based on the Kafka source we use
> (we'd get an alert if it suddenly started consuming from a very old offset).
>
> Regards,
> Alexis.
>
> On Thu, 5 Oct 2023, 19:36 ramkrishna vasudevan, 
> wrote:
>
>> Sorry for the late reply. Just in case you restart the job , is it able
>> to safely use the checkpoint and get back to the checkpointed state?
>>
>> Regards
>> Ram,
>>
>> On Thu, Sep 28, 2023 at 4:46 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hi Surendra,
>>>
>>> there are no exceptions in the logs, nor anything salient with
>>> INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
>>> set the config
>>>
>>> execution.checkpointing.tolerable-failed-checkpoints: 1
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
>>> surendralilh...@gmail.com>:
>>>
>>>> Hi Alexis,
>>>>
>>>> Could you please check the TaskManager log for any exceptions?
>>>>
>>>> Thanks
>>>> Surendra
>>>>
>>>>
>>>> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We are using ABFSS for RocksDB's backend as well as the storage dir
>>>>> required for Kubernetes HA. In the Azure Portal's monitoring insights I 
>>>>> see
>>>>> that every single operation contains failing transactions for the
>>>>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>>>>> know the storage account is only used by Flink. Checkpointing isn't
>>>>> failing, but I wonder if this could be an issue in the long term?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>


Re: Disable flink old checkpoint clean

2023-11-08 Thread Alexis Sarda-Espinosa
Hello,

maybe someone can correct me if I'm wrong, but reading through [1], it
seems to me that manually triggered checkpoints were meant for these
scenarios. If the implementation follows the ticket's description, a
user-triggered checkpoint would "break the chain of incremental
checkpoints", which would allow a safer activation of S3 TTL?

[1] https://issues.apache.org/jira/browse/FLINK-27101

Regards,
Alexis.

Am Mi., 8. Nov. 2023 um 06:51 Uhr schrieb Jinzhong Li <
lijinzhong2...@gmail.com>:

> Hi Yang,
>
> I think there is no configuration option available that allow users to
> disable checkpoint file cleanup at runtime.
>
> Does your flink application use incremental checkpoint?
> 1) If yes, i think leveraging S3's lifecycle management to clean
> checkpoint files is not safe, because it may accidentally delete a file
> which is still in use, although the probability is small.
> 2) If no, you can try to enable incremental checkpoint and increase the
> checkpoint interval to reduce the S3 traffic.
>
> Yang LI  于2023年11月8日周三 04:58写道:
>
>> Hi Martijn,
>>
>>
>> We're currently utilizing flink-s3-fs-presto. After reviewing the
>> flink-s3-fs-hadoop source code, I believe we would encounter similar issues
>> with it as well.
>>
>> When we say, 'The purpose of a checkpoint, in principle, is that Flink
>> manages its lifecycle,' I think it implies that the automatic cleanup of
>> old checkpoints is an integral part of Flink's lifecycle management.
>> However, is there a configuration option available that allows us to
>> disable this automatic cleanup? We're considering leveraging AWS S3's
>> lifecycle management capabilities to handle this aspect instead of relying
>> on Flink.
>>
>> Best,
>> Yang
>>
>> On Tue, 7 Nov 2023 at 18:44, Martijn Visser 
>> wrote:
>>
>>> Ah, I actually misread checkpoint and savepoints, sorry. The purpose
>>> of a checkpoint in principle is that Flink manages its lifecycle.
>>> Which S3 interface are you using for the checkpoint storage?
>>>
>>> On Tue, Nov 7, 2023 at 6:39 PM Martijn Visser 
>>> wrote:
>>> >
>>> > Hi Yang,
>>> >
>>> > If you use the NO_CLAIM mode, Flink will not assume ownership of a
>>> > snapshot and leave it up to the user to delete them. See the blog [1]
>>> > for more details.
>>> >
>>> > Best regards,
>>> >
>>> > Martijn
>>> >
>>> > [1]
>>> https://flink.apache.org/2022/05/06/improvements-to-flink-operations-snapshots-ownership-and-savepoint-formats/#no_claim-default-mode
>>> >
>>> > On Tue, Nov 7, 2023 at 5:29 PM Junrui Lee  wrote:
>>> > >
>>> > > Hi Yang,
>>> > >
>>> > >
>>> > > You can try configuring
>>> "execution.checkpointing.externalized-checkpoint-retention:
>>> RETAIN_ON_CANCELLATION"[1] and increasing the value of
>>> "state.checkpoints.num-retained"[2] to retain more checkpoints.
>>> > >
>>> > >
>>> > > Here are the official documentation links for more details:
>>> > >
>>> > > [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention
>>> > >
>>> > > [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#state-checkpoints-num-retained
>>> > >
>>> > >
>>> > > Best,
>>> > >
>>> > > Junrui
>>> > >
>>> > >
>>> > > Yang LI  于2023年11月7日周二 22:02写道:
>>> > >>
>>> > >> Dear Flink Community,
>>> > >>
>>> > >> In our Flink application, we persist checkpoints to AWS S3.
>>> Recently, during periods of high job parallelism and traffic, we've
>>> experienced checkpoint failures. Upon investigating, it appears these may
>>> be related to S3 delete object requests interrupting checkpoint re-uploads,
>>> as evidenced by numerous InterruptedExceptions.
>>> > >>
>>> > >> We aim to explore options for disabling the deletion of stale
>>> checkpoints. Despite consulting the Flink configuration documentation and
>>> conducting various tests, the appropriate setting to prevent old checkpoint
>>> cleanup remains elusive.
>>> > >>
>>> > >> Could you advise if there's a method to disable the automatic
>>> cleanup of old Flink checkpoints?
>>> > >>
>>> > >> Best,
>>> > >> Yang
>>>
>>


Re: Updating existing state with state processor API

2023-10-27 Thread Alexis Sarda-Espinosa
Hi Matthias,

Thanks for the response. I guess the specific question would be, if I work
with an existing savepoint and pass an empty DataStream to
OperatorTransformation#bootstrapWith, will the new savepoint end up with an
empty state for the modified operator, or will it maintain the existing
state because nothing was changed?

Regards,
Alexis.

Am Fr., 27. Okt. 2023 um 08:40 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Good morning Alexis,
>
>
>
> Something like this we do all the time.
>
> Read and existing savepoint, copy some of the not to be changed operator
> states (keyed/non-keyed) over, and process/patch the remaining ones by
> transforming and bootstrapping to new state.
>
>
>
> I could spare more details for more specific questions, if you like 
>
>
>
> Regards
>
>
>
> Thias
>
>
>
> PS: I’m currently working on this ticket in order to get some glitches
> removed: FLINK-26585 <https://issues.apache.org/jira/browse/FLINK-26585>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Thursday, October 26, 2023 4:01 PM
> *To:* user 
> *Subject:* Updating existing state with state processor API
>
>
>
> Hello,
>
>
>
> The documentation of the state processor API has some examples to modify
> an existing savepoint by defining a StateBootstrapTransformation. In all
> cases, the entrypoint is OperatorTransformation#bootstrapWith, which
> expects a DataStream. If I pass an empty DataStream to bootstrapWith and
> then apply the resulting transformation to an existing savepoint, will the
> transformation still receive data from the existing state?
>
>
>
> If the aforementioned is incorrect, I imagine I could instantiate
> a SavepointReader and create a DataStream of the existing state with it,
> which I could then pass to the bootstrapWith method directly or after
> "unioning" it with additional state. Would this work?
>
>
>
> Regards,
>
> Alexis.
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Updating existing state with state processor API

2023-10-26 Thread Alexis Sarda-Espinosa
Hello,

The documentation of the state processor API has some examples to modify an
existing savepoint by defining a StateBootstrapTransformation. In all
cases, the entrypoint is OperatorTransformation#bootstrapWith, which
expects a DataStream. If I pass an empty DataStream to bootstrapWith and
then apply the resulting transformation to an existing savepoint, will the
transformation still receive data from the existing state?

If the aforementioned is incorrect, I imagine I could instantiate
a SavepointReader and create a DataStream of the existing state with it,
which I could then pass to the bootstrapWith method directly or after
"unioning" it with additional state. Would this work?

Regards,
Alexis.


Re: Continuous errors with Azure ABFSS

2023-10-06 Thread Alexis Sarda-Espinosa
Yes, that also works correctly, at least based on the Kafka source we use
(we'd get an alert if it suddenly started consuming from a very old offset).

Regards,
Alexis.

On Thu, 5 Oct 2023, 19:36 ramkrishna vasudevan, 
wrote:

> Sorry for the late reply. Just in case you restart the job , is it able to
> safely use the checkpoint and get back to the checkpointed state?
>
> Regards
> Ram,
>
> On Thu, Sep 28, 2023 at 4:46 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Surendra,
>>
>> there are no exceptions in the logs, nor anything salient with
>> INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
>> set the config
>>
>> execution.checkpointing.tolerable-failed-checkpoints: 1
>>
>> Regards,
>> Alexis.
>>
>> Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
>> surendralilh...@gmail.com>:
>>
>>> Hi Alexis,
>>>
>>> Could you please check the TaskManager log for any exceptions?
>>>
>>> Thanks
>>> Surendra
>>>
>>>
>>> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> We are using ABFSS for RocksDB's backend as well as the storage dir
>>>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>>>> that every single operation contains failing transactions for the
>>>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>>>> know the storage account is only used by Flink. Checkpointing isn't
>>>> failing, but I wonder if this could be an issue in the long term?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Surendra,

there are no exceptions in the logs, nor anything salient with
INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
set the config

execution.checkpointing.tolerable-failed-checkpoints: 1

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
surendralilh...@gmail.com>:

> Hi Alexis,
>
> Could you please check the TaskManager log for any exceptions?
>
> Thanks
> Surendra
>
>
> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Ram,

Thanks for that. We configure a path with ABFSS scheme in the following
settings:

- state.checkpoints.dir
- state.savepoints.dir
- high-availability.storageDir

We use RocksDB with incremental checkpointing every minute.

I found the metrics from Azure in the storage account under Monitoring,
Insights, Failures, scrolling down. I'll attach a screenshot here, although
I'm not sure that works well with the distribution list.

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 07:28 Uhr schrieb ramkrishna vasudevan <
ramvasu.fl...@gmail.com>:

> Can you help with more info here?
> The RocksDB backend itself is in ABFS instead of local? Or you mean the
> checkpoint is in ABFS but local dir for RocksDB is in local storage?
>
> GetPathSTatus is done by your monitoring pages? We run Flink on ABFS so we
> would like to see if we can help you out.
>
> Regards
> Ram
>
> On Thu, Sep 28, 2023 at 2:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Continuous errors with Azure ABFSS

2023-09-27 Thread Alexis Sarda-Espinosa
Hello,

We are using ABFSS for RocksDB's backend as well as the storage dir
required for Kubernetes HA. In the Azure Portal's monitoring insights I see
that every single operation contains failing transactions for the
GetPathStatus API. Unfortunately I don't see any additional details, but I
know the storage account is only used by Flink. Checkpointing isn't
failing, but I wonder if this could be an issue in the long term?

Regards,
Alexis.


Re: Side outputs documentation

2023-09-26 Thread Alexis Sarda-Espinosa
I see, sounds good, thanks for the clarification.

Am Di., 26. Sept. 2023 um 03:29 Uhr schrieb Yunfeng Zhou <
flink.zhouyunf...@gmail.com>:

> Hi Alexis,
>
> Thanks for the clarification. I found the second constructor on
> Flink's master branch here[1], and maybe it was that we had been
> commenting on different versions of Flink, and the second constructor
> has not been introduced in the version you use. From the source code I
> can see that the OutputTag need not be anonymous so long as the type
> extraction process passes, while making it anonymous guarantees the
> success of this step, so you are right that you need not bother about
> this matter so long as your tests and jobs can pass. Besides, I wonder
> whether being a static field influences the anonymity of a variable.
> To my understanding, making it anonymous means coding `new
> OutputTag("foobar"){}` instead of  `new
> OutputTag("foobar")`. It doesn't matter whether the prefix is
> `private OutputTag tag = new OutputTag("foobar"){}` or
> `private static OutputTag tag = new
> OutputTag("foobar"){}`. They should be independent from each
> other and OutputTag's document is correct from this aspect.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/OutputTag.java#L82
>
> Best,
> Yunfeng
>
> On Mon, Sep 25, 2023 at 10:57 PM Alexis Sarda-Espinosa
>  wrote:
> >
> > Hi Yunfeng,
> >
> > Thanks for the response. I hadn't even seen the other constructor, but
> it seems that the single-arg constructor works fine even if the output tag
> is declared as "static final", at least in my use case. I imagine Flink
> would complain about unknown types if it really can't figure it out
> automatically, so maybe I can just let it be as long as tests pass, but I
> was wondering if Flink really needs a non-static field to analyze type
> information here. Who knows, maybe there are some scenarios where it's
> really a must.
> >
> > Regards,
> > Alexis.
> >
> > Am Mo., 25. Sept. 2023 um 05:17 Uhr schrieb Yunfeng Zhou <
> flink.zhouyunf...@gmail.com>:
> >>
> >> Hi Alexis,
> >>
> >> If you create OutputTag with the constructor `OutputTag(String id)`,
> >> you need to make it anonymous for Flink to analyze the type
> >> information. But if you use the constructor `OutputTag(String id,
> >> TypeInformation typeInfo)`, you need not make it anonymous as you
> >> have provided the type information.
> >>
> >> The second constructor is introduced after the document and the first
> >> constructor, and I think the document might have been outdated and not
> >> match with OutputTag's current behavior. A ticket and PR could be
> >> added to fix the document. What do you think?
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
> >>  wrote:
> >> >
> >> > Hello,
> >> >
> >> > very quick question, the documentation for side outputs states that
> an OutputTag "needs to be an anonymous inner class, so that we can analyze
> the type" (this is written in a comment in the example). Is this really
> true? I've seen many examples where it's a static element and it seems to
> work fine.
> >> >
> >> > Regards,
> >> > Alexis.
> >> >
>


Re: Side outputs documentation

2023-09-25 Thread Alexis Sarda-Espinosa
Hi Yunfeng,

Thanks for the response. I hadn't even seen the other constructor, but it
seems that the single-arg constructor works fine even if the output tag is
declared as "static final", at least in my use case. I imagine Flink would
complain about unknown types if it really can't figure it out
automatically, so maybe I can just let it be as long as tests pass, but I
was wondering if Flink really needs a non-static field to analyze type
information here. Who knows, maybe there are some scenarios where it's
really a must.

Regards,
Alexis.

Am Mo., 25. Sept. 2023 um 05:17 Uhr schrieb Yunfeng Zhou <
flink.zhouyunf...@gmail.com>:

> Hi Alexis,
>
> If you create OutputTag with the constructor `OutputTag(String id)`,
> you need to make it anonymous for Flink to analyze the type
> information. But if you use the constructor `OutputTag(String id,
> TypeInformation typeInfo)`, you need not make it anonymous as you
> have provided the type information.
>
> The second constructor is introduced after the document and the first
> constructor, and I think the document might have been outdated and not
> match with OutputTag's current behavior. A ticket and PR could be
> added to fix the document. What do you think?
>
> Best,
> Yunfeng
>
> On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
>  wrote:
> >
> > Hello,
> >
> > very quick question, the documentation for side outputs states that an
> OutputTag "needs to be an anonymous inner class, so that we can analyze the
> type" (this is written in a comment in the example). Is this really true?
> I've seen many examples where it's a static element and it seems to work
> fine.
> >
> > Regards,
> > Alexis.
> >
>


Side outputs documentation

2023-09-22 Thread Alexis Sarda-Espinosa
Hello,

very quick question, the documentation for side outputs states that an
OutputTag "needs to be an anonymous inner class, so that we can analyze the
type" (this is written in a comment in the example). Is this really true?
I've seen many examples where it's a static element and it seems to work
fine.

Regards,
Alexis.


Re: Failure to restore from last completed checkpoint

2023-09-08 Thread Alexis Sarda-Espinosa
Hello,

Just a shot in the dark here, but could it be related to
https://issues.apache.org/jira/browse/FLINK-32241 ?

Such failures can cause many exceptions, but I think the ones you've
included aren't pointing to the root cause, so I'm not sure if that issue
applies to you.

Regards,
Alexis.

On Fri, 8 Sept 2023, 20:43 Jacqlyn Bender via user, 
wrote:

> Hi Yanfei,
>
> We were never able to restore from a checkpoint, we ended up restoring
> from a savepoint as fallback. Would those logs suggest we failed to take a
> checkpoint before the job manager restarted? Our observabillity monitors
> showed no failed checkpoints.
>
> Here is an exception that occurred before the failure to restore from the
> checkpoint:
>
> java.io.IOException: Cannot register Closeable, registry is already
> closed. Closing argument.
>
> at
> org.apache.flink.util.AbstractAutoCloseableRegistry.registerCloseable(AbstractAutoCloseableRegistry.java:89)
> ~[a-pipeline-name.jar:1.0]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:128)
> ~[flink-dist-1.17.1-shopify-81a88f8.jar:1.17.1-shopify-81a88f8]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist-1.17.1-shopify-81a88f8.jar:1.17.1-shopify-81a88f8]
>
> at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[a-pipeline-name.jar:1.0]
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Thanks,
> Jacqlyn
>
> On Thu, Sep 7, 2023 at 7:42 PM Yanfei Lei  wrote:
>
>> Hey Jacqlyn,
>> According to the stack trace, it seems that there is a problem when
>> the checkpoint is triggered. Is this the problem after the restore?
>> would you like to share some logs related to restoring?
>>
>> Best,
>> Yanfei
>>
>> Jacqlyn Bender via user  于2023年9月8日周五 05:11写道:
>> >
>> > Hey folks,
>> >
>> >
>> > We experienced a pipeline failure where our job manager restarted and
>> we were for some reason unable to restore from our last successful
>> checkpoint. We had regularly completed checkpoints every 10 minutes up to
>> this failure and 0 failed checkpoints logged. Using Flink version 1.17.1.
>> >
>> >
>> > Wondering if anyone can shed light on what might have happened?
>> >
>> >
>> > Here's the error from our logs:
>> >
>> >
>> > Message: FATAL: Thread ‘Checkpoint Timer’ produced an uncaught
>> exception. Stopping the process...
>> >
>> >
>> > extendedStackTrace: java.util.concurrent.CompletionException:
>> java.util.concurrent.CompletionException: java.lang.NullPointerException
>> >
>> > at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$8(CheckpointCoordinator.java:669)
>> ~[a-pipeline-name.jar:1.0]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:910)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> [?:?]
>> >
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>> >
>> > at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>> >
>> > at java.lang.Thread.run(Thread.java:829) [?:?]
>> >
>> > Caused by: java.util.concurrent.CompletionException:
>> java.lang.NullPointerException
>> >
>> > at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>> ~[?:?]
>> >
>> > ... 7 more
>> >
>> > Caused by: java.lang.NullPointerException
>> >
>> > at
>> 

Semantics of purging with global windows

2023-08-30 Thread Alexis Sarda-Espinosa
Hello,

According to the javadoc of TriggerResult.PURGE, "All elements in the
window are cleared and the window is discarded, without evaluating
the window function or emitting any elements."
However, I've noticed that using a GlobalWindow (with a custom trigger)
followed by an AggregateFunction will call the function's add() even when
the trigger result is PURGE.

It seems to me that this has been the behavior for a very long time:

https://github.com/apache/flink/commit/6cd8ceb10c841827cf89b74ecf5a0495a6933d53#diff-6d18531a35cddca6e5995c40c7a564fd711b998d567c4e167a401f76ca29a2bbR295-R299

Is that really necessary? I'm guessing that operator deals with all types
of windows, so I'm not sure how that affects other window types.

Regards,
Alexis.


Re: Question about serialization of java.util classes

2023-08-15 Thread Alexis Sarda-Espinosa
Check this answer: https://stackoverflow.com/a/64721838/5793905

You could then use, for example, something like: new
SetTypeInfo(Types.STRING) instead of Types.LIST(Types.STRING)

Am Di., 15. Aug. 2023 um 10:40 Uhr schrieb :

> Hello Alexis,
>
> Thank you for sharing the helper classes this but unfortunately I have no
> idea how to use these classes or how they might be able to help me. This is
> all very new to me and I honestly can't wrap my head around Flink's type
> information system.
>
> Best regards,
> Saleh.
>
> On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
> Hello,
>
> AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
> Flink can do about that. Here's an example of helper classes I've been
> using to support set serde in Flink POJOs, but note that it's hardcoded for
> LinkedHashSet, so you would have to create different implementations if you
> need to differentiate sorted sets:
>
> https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398
>
> Regards,
> Alexis.
>
>
> Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :
>
>> Hi,
>>
>> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
>> ```
>> package com.example;
>> import java.util.ArrayList;
>> import java.util.HashSet;
>> import java.util.TreeSet;
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> public class App {
>> public static class Pojo {
>> public ArrayList list;
>> public HashSet set;
>> public TreeSet treeset;
>> public Pojo() {
>> this.list = new ArrayList<>();
>> this.set = new HashSet<>();
>> this.treeset = new TreeSet<>();
>> }
>> }
>> public static void main(String[] args) throws Exception {
>> var env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().disableGenericTypes();
>> env.fromElements(new Pojo()).print();
>> env.execute("Pipeline");
>> }
>> }
>> ```
>>
>> The result of running:
>> ```
>> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - class java.util.ArrayList does not contain a setter for field
>> size
>> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Class class java.util.ArrayList cannot be used as a POJO
>> type because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types
>> & Serialization" for details of the effect on performance and schema
>> evolution.
>> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#list will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.HashSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#set will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.TreeSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#sset will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by
>> org.apache.flink.api.java.ClosureCleaner
>> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)

Re: Question about serialization of java.util classes

2023-08-14 Thread Alexis Sarda-Espinosa
Hello,

AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
Flink can do about that. Here's an example of helper classes I've been
using to support set serde in Flink POJOs, but note that it's hardcoded for
LinkedHashSet, so you would have to create different implementations if you
need to differentiate sorted sets:

https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398

Regards,
Alexis.


Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :

> Hi,
>
> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
> ```
> package com.example;
> import java.util.ArrayList;
> import java.util.HashSet;
> import java.util.TreeSet;
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class App {
> public static class Pojo {
> public ArrayList list;
> public HashSet set;
> public TreeSet treeset;
> public Pojo() {
> this.list = new ArrayList<>();
> this.set = new HashSet<>();
> this.treeset = new TreeSet<>();
> }
> }
> public static void main(String[] args) throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableGenericTypes();
> env.fromElements(new Pojo()).print();
> env.execute("Pipeline");
> }
> }
> ```
>
> The result of running:
> ```
> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - class java.util.ArrayList does not contain a setter for field size
> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Class class java.util.ArrayList cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types
> & Serialization" for details of the effect on performance and schema
> evolution.
> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#list will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.HashSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#set will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.TreeSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#sset will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and
> type java.util.ArrayList is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
>
> at 
> 

Task manager creation in Flink native Kubernetes (application mode)

2023-07-25 Thread Alexis Sarda-Espinosa
Hi everyone,

>From its inception (at least AFAIK), application mode for native Kubernetes
has always created "unmanaged" pods for task managers. I would like to know
if there are any specific benefits to this, or if on the other hand there
are specific reasons not to use Kubernetes Deployments instead.

In my case, I ask for a very specific reason. With the current approach, it
is almost impossible to determine if a task manager crash was due to an OOM
kill, given that there isn't any kind of history for the unmanaged pods.

I could add that these TM pods also confuse Argo CD and their state is
always "progressing". That's not so critical, but I don't know if anyone
else finds that odd.

I would be happy to know what others think.

Regards,
Alexis.


Re: Checkpointing and savepoints can never complete after inconsistency

2023-07-10 Thread Alexis Sarda-Espinosa
I found out someone else reported this and found a workaround:
https://issues.apache.org/jira/browse/FLINK-32241

Am Mo., 10. Juli 2023 um 16:45 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi again,
>
> I have found out that this issue occurred in 3 different clusters, and 2
> of them could not recover after restarting pods, it seems state was
> completely corrupted afterwards and was thus lost. I had never seen this
> before 1.17.1, so it might be a newly introduced problem.
>
> Regards,
> Alexis.
>
>
> Am Mo., 10. Juli 2023 um 11:07 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hello,
>>
>> we have just experienced a weird issue in one of our Flink clusters which
>> might be difficult to reproduce, but I figured I would document it in case
>> some of you know what could have gone wrong. This cluster had been running
>> with Flink 1.16.1 for a long time and was recently updated to 1.17.1. It
>> ran fine for a few days, but suddenly all checkpoints started failing
>> (RocksDB + Azure ABFSS + Kubernetes HA). I don't see anything interesting
>> in the logs right before the problem started, just afterwards:
>>
>> Jul 9, 2023 @
>> 18:13:41.271 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Completed
>> checkpoint 187398 for job 3d85035b76921c0a905f6c4fade06eca (19891956 bytes,
>> checkpointDuration=443 ms, finalizationTime=103 ms).
>> Jul 9, 2023 @
>> 18:14:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Triggering
>> checkpoint 187399 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919280725 for job
>> 3d85035b76921c0a905f6c4fade06eca.
>> Jul 9, 2023 @
>> 18:15:05.472 org.apache.kafka.clients.NetworkClient [AdminClient
>> clientId=flink-enumerator-admin-client] Node 0 disconnected.
>> Jul 9, 2023 @
>> 18:15:10.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Checkpoint
>> 187399 of job 3d85035b76921c0a905f6c4fade06eca expired before completing.
>> Jul 9, 2023 @
>> 18:15:10.741 org.apache.flink.runtime.checkpoint.CheckpointFailureManager 
>> Failed
>> to trigger or complete checkpoint 187399 for job
>> 3d85035b76921c0a905f6c4fade06eca. (0 consecutive failed attempts so far)
>> Jul 9, 2023 @
>> 18:15:10.905 
>> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable asynchronous
>> part of checkpoint 187399 could not be completed.
>> Jul 9, 2023 @
>> 18:15:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Triggering
>> checkpoint 187400 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919340725 for job
>> 3d85035b76921c0a905f6c4fade06eca.
>> *Jul 9, 2023 @
>> 18:15:40.957 org.apache.flink.runtime.state.SharedStateRegistryImpl 
>> Duplicated
>> registration under key
>> ec5b73d0-a04c-4574-b380-7981c7173d80-KeyGroupRange{startKeyGroup=60,
>> endKeyGroup=119}-016511.sst of a new state:
>> ByteStreamStateHandle{handleName='abfss://.../checkpoints/3d85035b76921c0a905f6c4fade06eca/shared/2eddc140-51c8-4575-899f-e70ca71f95be',
>> dataBytes=1334}. This might happen during the task failover if state
>> backend creates different states with the same key before and after the
>> failure. Discarding the OLD state and keeping the NEW one which is included
>> into a completed checkpoint
>>
>> This last line appeared multiple times, and after that all checkpoints
>> failed. At some point, this exception also started appearing:
>>
>> org.apache.flink.runtime.jobmaster.JobMaster Error while processing
>> AcknowledgeCheckpoint message
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> finalize the pending checkpoint 188392. Failure reason: Failure to finalize
>> checkpoint.
>> ...
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "file"
>>
>> In the Kubernetes HA CM I can see that the value under ".data.counter"
>> was still increasing, not sure if that's expected, but since we configured
>> only 1 allowable consecutive checkpoint failure, the job kept restarting
>> every 2 minutes. Trying to create savepoints also failed with
>> UnsupportedFileSystemException.
>>
>> Manually deleting the Flink cluster's pods was enough to bring it back to
>> a working state. Maybe the handling for this error is not working correctly?
>>
>> Regards,
>> Alexis.
>>
>


Re: Checkpointing and savepoints can never complete after inconsistency

2023-07-10 Thread Alexis Sarda-Espinosa
Hi again,

I have found out that this issue occurred in 3 different clusters, and 2 of
them could not recover after restarting pods, it seems state was completely
corrupted afterwards and was thus lost. I had never seen this before
1.17.1, so it might be a newly introduced problem.

Regards,
Alexis.


Am Mo., 10. Juli 2023 um 11:07 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hello,
>
> we have just experienced a weird issue in one of our Flink clusters which
> might be difficult to reproduce, but I figured I would document it in case
> some of you know what could have gone wrong. This cluster had been running
> with Flink 1.16.1 for a long time and was recently updated to 1.17.1. It
> ran fine for a few days, but suddenly all checkpoints started failing
> (RocksDB + Azure ABFSS + Kubernetes HA). I don't see anything interesting
> in the logs right before the problem started, just afterwards:
>
> Jul 9, 2023 @
> 18:13:41.271 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Completed
> checkpoint 187398 for job 3d85035b76921c0a905f6c4fade06eca (19891956 bytes,
> checkpointDuration=443 ms, finalizationTime=103 ms).
> Jul 9, 2023 @
> 18:14:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Triggering
> checkpoint 187399 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919280725 for job
> 3d85035b76921c0a905f6c4fade06eca.
> Jul 9, 2023 @
> 18:15:05.472 org.apache.kafka.clients.NetworkClient [AdminClient
> clientId=flink-enumerator-admin-client] Node 0 disconnected.
> Jul 9, 2023 @
> 18:15:10.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Checkpoint
> 187399 of job 3d85035b76921c0a905f6c4fade06eca expired before completing.
> Jul 9, 2023 @
> 18:15:10.741 org.apache.flink.runtime.checkpoint.CheckpointFailureManager 
> Failed
> to trigger or complete checkpoint 187399 for job
> 3d85035b76921c0a905f6c4fade06eca. (0 consecutive failed attempts so far)
> Jul 9, 2023 @
> 18:15:10.905 org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable 
> asynchronous
> part of checkpoint 187399 could not be completed.
> Jul 9, 2023 @
> 18:15:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Triggering
> checkpoint 187400 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919340725 for job
> 3d85035b76921c0a905f6c4fade06eca.
> *Jul 9, 2023 @
> 18:15:40.957 org.apache.flink.runtime.state.SharedStateRegistryImpl Duplicated
> registration under key
> ec5b73d0-a04c-4574-b380-7981c7173d80-KeyGroupRange{startKeyGroup=60,
> endKeyGroup=119}-016511.sst of a new state:
> ByteStreamStateHandle{handleName='abfss://.../checkpoints/3d85035b76921c0a905f6c4fade06eca/shared/2eddc140-51c8-4575-899f-e70ca71f95be',
> dataBytes=1334}. This might happen during the task failover if state
> backend creates different states with the same key before and after the
> failure. Discarding the OLD state and keeping the NEW one which is included
> into a completed checkpoint
>
> This last line appeared multiple times, and after that all checkpoints
> failed. At some point, this exception also started appearing:
>
> org.apache.flink.runtime.jobmaster.JobMaster Error while processing
> AcknowledgeCheckpoint message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> finalize the pending checkpoint 188392. Failure reason: Failure to finalize
> checkpoint.
> ...
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "file"
>
> In the Kubernetes HA CM I can see that the value under ".data.counter" was
> still increasing, not sure if that's expected, but since we configured only
> 1 allowable consecutive checkpoint failure, the job kept restarting every 2
> minutes. Trying to create savepoints also failed with
> UnsupportedFileSystemException.
>
> Manually deleting the Flink cluster's pods was enough to bring it back to
> a working state. Maybe the handling for this error is not working correctly?
>
> Regards,
> Alexis.
>


Checkpointing and savepoints can never complete after inconsistency

2023-07-10 Thread Alexis Sarda-Espinosa
Hello,

we have just experienced a weird issue in one of our Flink clusters which
might be difficult to reproduce, but I figured I would document it in case
some of you know what could have gone wrong. This cluster had been running
with Flink 1.16.1 for a long time and was recently updated to 1.17.1. It
ran fine for a few days, but suddenly all checkpoints started failing
(RocksDB + Azure ABFSS + Kubernetes HA). I don't see anything interesting
in the logs right before the problem started, just afterwards:

Jul 9, 2023 @
18:13:41.271 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Completed
checkpoint 187398 for job 3d85035b76921c0a905f6c4fade06eca (19891956 bytes,
checkpointDuration=443 ms, finalizationTime=103 ms).
Jul 9, 2023 @
18:14:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Triggering
checkpoint 187399 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919280725 for job
3d85035b76921c0a905f6c4fade06eca.
Jul 9, 2023 @
18:15:05.472 org.apache.kafka.clients.NetworkClient [AdminClient
clientId=flink-enumerator-admin-client] Node 0 disconnected.
Jul 9, 2023 @
18:15:10.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Checkpoint
187399 of job 3d85035b76921c0a905f6c4fade06eca expired before completing.
Jul 9, 2023 @
18:15:10.741 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
Failed
to trigger or complete checkpoint 187399 for job
3d85035b76921c0a905f6c4fade06eca. (0 consecutive failed attempts so far)
Jul 9, 2023 @
18:15:10.905 org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable
asynchronous
part of checkpoint 187399 could not be completed.
Jul 9, 2023 @
18:15:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Triggering
checkpoint 187400 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919340725 for job
3d85035b76921c0a905f6c4fade06eca.
*Jul 9, 2023 @
18:15:40.957 org.apache.flink.runtime.state.SharedStateRegistryImpl Duplicated
registration under key
ec5b73d0-a04c-4574-b380-7981c7173d80-KeyGroupRange{startKeyGroup=60,
endKeyGroup=119}-016511.sst of a new state:
ByteStreamStateHandle{handleName='abfss://.../checkpoints/3d85035b76921c0a905f6c4fade06eca/shared/2eddc140-51c8-4575-899f-e70ca71f95be',
dataBytes=1334}. This might happen during the task failover if state
backend creates different states with the same key before and after the
failure. Discarding the OLD state and keeping the NEW one which is included
into a completed checkpoint

This last line appeared multiple times, and after that all checkpoints
failed. At some point, this exception also started appearing:

org.apache.flink.runtime.jobmaster.JobMaster Error while processing
AcknowledgeCheckpoint message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not
finalize the pending checkpoint 188392. Failure reason: Failure to finalize
checkpoint.
...
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "file"

In the Kubernetes HA CM I can see that the value under ".data.counter" was
still increasing, not sure if that's expected, but since we configured only
1 allowable consecutive checkpoint failure, the job kept restarting every 2
minutes. Trying to create savepoints also failed with
UnsupportedFileSystemException.

Manually deleting the Flink cluster's pods was enough to bring it back to a
working state. Maybe the handling for this error is not working correctly?

Regards,
Alexis.


Re: Kafka source with idleness and alignment stops consuming

2023-06-29 Thread Alexis Sarda-Espinosa
BTW, it seems I spoke too soon in my previous email. I left the job running
overnight with each source having its own alignment group to evaluate only
per-split alignment, and I can see that eventually some partitions never
resumed consumption and the consumer lag increased.

Regards,
Alexis.

Am Do., 29. Juni 2023 um 10:08 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Martjin,
>
> thanks for the pointers. I think the issue I'm seeing is not caused by
> those because in my case the watermarks are not negative. Some more
> information from my setup in case it's relevant:
>
> - All Kafka topics have 6 partitions.
> - Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
> parallelism=1.
>
> Regards,
> Alexis.
>
> Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
> martijnvis...@apache.org>:
>
>> Hi Alexis,
>>
>> There are a couple of recent Flink tickets on watermark alignment,
>> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
>> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
>> also applicable in your case?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> just for completeness, I don't see the problem if I assign a different
>>> alignment group to each source, i.e. using only split-level watermark
>>> alignment.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :
>>>
>>>> Hi,
>>>> I have the same trouble. This is really a bug.
>>>> `shouldWaitForAlignment` needs to be another change.
>>>>
>>>> By the way, a source will be marked as idle, when the source has
>>>> waiting for alignment for a long time. Is this a bug?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
>>>> 写道:
>>>>
>>>> Hello,
>>>>
>>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>>>> the externalized Kafka connector. My job has 3 sources whose watermark
>>>> strategies are defined like this:
>>>>
>>>> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>>>> .withIdleness(idleTimeout)
>>>> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>>>> Duration.ofSeconds(1L))
>>>>
>>>> The max allowed drift is currently 5 seconds, and my sources have an
>>>> idleTimeout of 1, 1.5, and 5 seconds.
>>>>
>>>> What I observe is that, when I restart the job, all sources publish
>>>> messages, but then 2 of them are marked as idle and never resume. I found
>>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be
>>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
>>>> negative values:
>>>>
>>>> 2023-06-27 15:11:42,927 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
>>>> subTaskId=1
>>>> 2023-06-27 15:11:43,009 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,091 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,116 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,298 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>>>> 2023-06-27 15:11:43,304 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>> 2023-06-27 15:11:43,306 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>> 2023-06-27 15:11:43,486 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>> 2023-06-27 15:11:43,489 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>> 2023-06-27 15:11:43,492 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>>
>>>> Does anyone know if I'm missing something or this is really a bug?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Kafka source with idleness and alignment stops consuming

2023-06-29 Thread Alexis Sarda-Espinosa
Hi Martjin,

thanks for the pointers. I think the issue I'm seeing is not caused by
those because in my case the watermarks are not negative. Some more
information from my setup in case it's relevant:

- All Kafka topics have 6 partitions.
- Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
parallelism=1.

Regards,
Alexis.

Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Alexis,
>
> There are a couple of recent Flink tickets on watermark alignment,
> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
> also applicable in your case?
>
> Best regards,
>
> Martijn
>
> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> just for completeness, I don't see the problem if I assign a different
>> alignment group to each source, i.e. using only split-level watermark
>> alignment.
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :
>>
>>> Hi,
>>> I have the same trouble. This is really a bug.
>>> `shouldWaitForAlignment` needs to be another change.
>>>
>>> By the way, a source will be marked as idle, when the source has waiting
>>> for alignment for a long time. Is this a bug?
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
>>> 写道:
>>>
>>> Hello,
>>>
>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>>> the externalized Kafka connector. My job has 3 sources whose watermark
>>> strategies are defined like this:
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>>> .withIdleness(idleTimeout)
>>> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>>> Duration.ofSeconds(1L))
>>>
>>> The max allowed drift is currently 5 seconds, and my sources have an
>>> idleTimeout of 1, 1.5, and 5 seconds.
>>>
>>> What I observe is that, when I restart the job, all sources publish
>>> messages, but then 2 of them are marked as idle and never resume. I found
>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be
>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
>>> negative values:
>>>
>>> 2023-06-27 15:11:42,927 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
>>> subTaskId=1
>>> 2023-06-27 15:11:43,009 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,091 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,116 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,298 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>>> 2023-06-27 15:11:43,304 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>> 2023-06-27 15:11:43,306 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>> 2023-06-27 15:11:43,486 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>> 2023-06-27 15:11:43,489 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>> 2023-06-27 15:11:43,492 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>
>>> Does anyone know if I'm missing something or this is really a bug?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Kafka source with idleness and alignment stops consuming

2023-06-28 Thread Alexis Sarda-Espinosa
Hello,

just for completeness, I don't see the problem if I assign a different
alignment group to each source, i.e. using only split-level watermark
alignment.

Regards,
Alexis.

Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :

> Hi,
> I have the same trouble. This is really a bug.
> `shouldWaitForAlignment` needs to be another change.
>
> By the way, a source will be marked as idle, when the source has waiting
> for alignment for a long time. Is this a bug?
>
>
>
>
>
>
> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
> 写道:
>
> Hello,
>
> I am currently evaluating idleness and alignment with Flink 1.17.1 and the
> externalized Kafka connector. My job has 3 sources whose watermark
> strategies are defined like this:
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout)
> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
> Duration.ofSeconds(1L))
>
> The max allowed drift is currently 5 seconds, and my sources have an
> idleTimeout of 1, 1.5, and 5 seconds.
>
> What I observe is that, when I restart the job, all sources publish
> messages, but then 2 of them are marked as idle and never resume. I found
> https://issues.apache.org/jira/browse/FLINK-31632, which should be fixed
> in 1.17.1, but I don't think it's the same issue, my logs don't show
> negative values:
>
> 2023-06-27 15:11:42,927 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
> subTaskId=1
> 2023-06-27 15:11:43,009 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
> 07:12:55.807) from subTaskId=0
> 2023-06-27 15:11:43,091 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
> 07:12:55.807) from subTaskId=0
> 2023-06-27 15:11:43,116 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
> 07:12:55.807) from subTaskId=0
> 2023-06-27 15:11:43,298 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
> 2023-06-27 15:11:43,304 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
> 2023-06-27 15:11:43,306 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
> 2023-06-27 15:11:43,486 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
> 2023-06-27 15:11:43,489 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
> 2023-06-27 15:11:43,492 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>
> Does anyone know if I'm missing something or this is really a bug?
>
> Regards,
> Alexis.
>
>


Kafka source with idleness and alignment stops consuming

2023-06-27 Thread Alexis Sarda-Espinosa
Hello,

I am currently evaluating idleness and alignment with Flink 1.17.1 and the
externalized Kafka connector. My job has 3 sources whose watermark
strategies are defined like this:

WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
.withIdleness(idleTimeout)
.withWatermarkAlignment("group", maxAllowedWatermarkDrift,
Duration.ofSeconds(1L))

The max allowed drift is currently 5 seconds, and my sources have an
idleTimeout of 1, 1.5, and 5 seconds.

What I observe is that, when I restart the job, all sources publish
messages, but then 2 of them are marked as idle and never resume. I found
https://issues.apache.org/jira/browse/FLINK-31632, which should be fixed in
1.17.1, but I don't think it's the same issue, my logs don't show negative
values:

2023-06-27 15:11:42,927 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
subTaskId=1
2023-06-27 15:11:43,009 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,091 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,116 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,298 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
2023-06-27 15:11:43,304 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
2023-06-27 15:11:43,306 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
2023-06-27 15:11:43,486 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
2023-06-27 15:11:43,489 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
2023-06-27 15:11:43,492 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]

Does anyone know if I'm missing something or this is really a bug?

Regards,
Alexis.


Re: Interaction between idling sources and watermark alignment

2023-06-16 Thread Alexis Sarda-Espinosa
Thank you very much for the explanation, Hong.

On Thu, 15 Jun 2023, 15:55 Teoh, Hong,  wrote:

> Hi Alexis, below is my understanding:
>
>
> > I see that, in Flink 1.17.1, watermark alignment will be supported (as
> beta) within a single source's splits and across different sources. I don't
> see this explicitly mentioned in the documentation, but I assume that the
> concept of "maximal drift" used for alignment also takes idleness into
> account, resuming any readers that were paused due to an idle split or
> source. Is my understanding correct?
>
> As far as I understand, the evaluation to “unpause” a given split that
> might have been paused due to watermark alignment is evaluated at fixed
> intervals here. [1]
>
> We see that the SourceCoordinator calls announceCombinedWatermark() that
> calculates the global watermark and that subsequently sends
> a WatermarkAlignmentEvent to each subtask. On each subtask, there is an
> evaluation of whether to “wake up” the operator. [2] [3]
>
> This means that there is a periodic evaluation of whether to “wake up”,
> controlled by the update interval, which defaults to 1s [4]
>
> > Also, something that isn't 100% clear to me when comparing to the
> previous watermark alignment documentation, even if I only wanted alignment
> within a single source's splits, I still need to
> call withWatermarkAlignment in the watermark strategy, right? Otherwise
> alignment will not take place, regardless
> of pipeline.watermark-alignment.allow-unaligned-source-splits.
>
> Yes, this is correct. Watermark groups are used to check whether multiple
> sources need to coordinate watermarks. If two sources A and B both belong
> to the same watermark group, then their watermarks will be aligned.
>
> Hope the above helps.
>
>
> Cheers,
> Hong
>
>
> [1]
> https://github.com/apache/flink/blob/45ba7ee87caee63a0babfd421b7c5eabaa779baa/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L160
> [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L556-L559
> [3]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L659
> [4]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java#L29
>
>
>
> On 13 Jun 2023, at 21:08, Alexis Sarda-Espinosa 
> wrote:
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
> Hi again, I'm not a fan of bumping questions, but I think this might be
> relevant, maybe enough to include it in the official documentation?
>
> Regards,
> Alexis.
>
> On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I see that, in Flink 1.17.1, watermark alignment will be supported (as
>> beta) within a single source's splits and across different sources. I don't
>> see this explicitly mentioned in the documentation, but I assume that the
>> concept of "maximal drift" used for alignment also takes idleness into
>> account, resuming any readers that were paused due to an idle split or
>> source. Is my understanding correct?
>>
>> Also, something that isn't 100% clear to me when comparing to the
>> previous watermark alignment documentation, even if I only wanted alignment
>> within a single source's splits, I still need to
>> call withWatermarkAlignment in the watermark strategy, right? Otherwise
>> alignment will not take place, regardless
>> of pipeline.watermark-alignment.allow-unaligned-source-splits.
>>
>> Regards,
>> Alexis.
>>
>>
>


Re: Interaction between idling sources and watermark alignment

2023-06-13 Thread Alexis Sarda-Espinosa
Hi again, I'm not a fan of bumping questions, but I think this might be
relevant, maybe enough to include it in the official documentation?

Regards,
Alexis.

On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, 
wrote:

> Hello,
>
> I see that, in Flink 1.17.1, watermark alignment will be supported (as
> beta) within a single source's splits and across different sources. I don't
> see this explicitly mentioned in the documentation, but I assume that the
> concept of "maximal drift" used for alignment also takes idleness into
> account, resuming any readers that were paused due to an idle split or
> source. Is my understanding correct?
>
> Also, something that isn't 100% clear to me when comparing to the previous
> watermark alignment documentation, even if I only wanted alignment within a
> single source's splits, I still need to call withWatermarkAlignment in the
> watermark strategy, right? Otherwise alignment will not take place,
> regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.
>
> Regards,
> Alexis.
>
>


Re: RocksDB segfault on state restore

2023-06-02 Thread Alexis Sarda-Espinosa
Hello,

A couple of potentially relevant pieces of information:

1. https://issues.apache.org/jira/browse/FLINK-16686
2. https://stackoverflow.com/a/64721838/5793905 (question was about schema
evolution, but the answer is more generally applicable)

Regards,
Alexis.

Am Fr., 2. Juni 2023 um 07:18 Uhr schrieb Gyula Fóra :

> Hi!
>
>
> In our case, no schema evolution was triggered , only the TTL was set from
> the beginning as far as I remember.
>
> I will double check
>
> Gyula
>
> On Fri, 2 Jun 2023 at 06:12, Hangxiang Yu  wrote:
>
>> Hi, Gyula.
>> It seems related to https://issues.apache.org/jira/browse/FLINK-23346.
>> We also saw core dump while using list state after triggering state
>> migration and ttl compaction filter. Have you triggered the schema
>> evolution ?
>> It seems a bug of the rocksdb list state together with ttl compaction
>> filter.
>>
>> On Wed, May 17, 2023 at 7:05 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> We are encountering an error on a larger stateful job (around 1 TB +
>>> state) on restore from a rocksdb checkpoint. The taskmanagers keep crashing
>>> with a segfault coming from the rocksdb native logic and seem to be related
>>> to the FlinkCompactionFilter mechanism.
>>>
>>> The gist with the full error report:  report:
>>> https://gist.github.com/gyfora/f307aa570d324d063e0ade9810f8bb25
>>>
>>> The core part is here:
>>> V  [libjvm.so+0x79478f]  Exceptions::
>>> (Thread*, char const*, int, oopDesc*)+0x15f
>>> V  [libjvm.so+0x960a68]  jni_Throw+0x88
>>> C  [librocksdbjni-linux64.so+0x222aa1]
>>>  JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long,
>>> long) const+0x121
>>> C  [librocksdbjni-linux64.so+0x6486c1]
>>>  rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&,
>>> std::string*) const+0x81
>>> C  [librocksdbjni-linux64.so+0x648bea]
>>>  rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice
>>> const&, rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&,
>>> std::string*, std::string*) const+0x14a
>>>
>>> Has anyone encountered a similar issue before?
>>>
>>> Thanks
>>> Gyula
>>>
>>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>


Interaction between idling sources and watermark alignment

2023-05-30 Thread Alexis Sarda-Espinosa
Hello,

I see that, in Flink 1.17.1, watermark alignment will be supported (as
beta) within a single source's splits and across different sources. I don't
see this explicitly mentioned in the documentation, but I assume that the
concept of "maximal drift" used for alignment also takes idleness into
account, resuming any readers that were paused due to an idle split or
source. Is my understanding correct?

Also, something that isn't 100% clear to me when comparing to the previous
watermark alignment documentation, even if I only wanted alignment within a
single source's splits, I still need to call withWatermarkAlignment in the
watermark strategy, right? Otherwise alignment will not take place,
regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Regards,
Alexis.


Kubernetes operator stops responding due to Connection reset by peer

2023-04-21 Thread Alexis Sarda-Espinosa
Hello,

Today, we received an alert because the operator appeared to be down. Upon
further investigation, we realized the alert was triggered because the
endpoint for Prometheus metrics (which we enabled) stopped responding, so
it seems the endpoint used for the liveness probe wasn't affected and the
pod was not restarted automatically.

The logs right before the problem started don't show anything odd, and once
the problem started, the logs were spammed with warning messages stating
"Connection reset by peer" with no further information. From what I can
see, nothing else was logged during that time, so it looks like the process
really had stalled.

I imagine this is not easy to reproduce and, while a pod restart was enough
to get back on track, it might be worth improving the liveness probe to
catch these situations.

Full stacktrace for reference:

An exceptionCaught() event was fired, and it reached at the tail of the
pipeline. It usually means the last handler in the pipeline did not handle
the exception.
java.io.IOException: Connection reset by peer at
java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method) at
java.base/sun.nio.ch.SocketDispatcher.read(Unknown Source) at
java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source) at
java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
java.base/sun.nio.ch.SocketChannelImpl.read(Unknown Source) at
org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledDirectByteBuf.setBytes(UnpooledDirectByteBuf.java:570)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)

Regards,
Alexis.


Requirements for POJO serialization

2023-04-11 Thread Alexis Sarda-Espinosa
Hello,

according to the documentation, a POJO must have a no-arg constructor and
either public fields or public getters and setters with conventional
naming. I recently realized that if I create an explicit TypeInfoFactory
that provides Types.POJO and all other required details, the getters and
setters aren't needed. Is this an official feature?

I ask because this means some classes could have an "immutable contract",
so to speak. I'm guessing final fields might still be unsupported, but I
haven't validated.

Regards,
Alexis.


Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-03-30 Thread Alexis Sarda-Espinosa
Hi Martijn,

just to be sure, if all state-related classes use a POJO serializer, Kryo
will never come into play, right? Given FLINK-16686 [1], I wonder how many
users actually have jobs with Kryo and RocksDB, but even if there aren't
many, that still leaves those who don't use RocksDB for
checkpoints/savepoints.

If Kryo were to stay in the Flink APIs in v1.X, is it impossible to let
users choose between v2/v5 jars by separating them like log4j2 jars?

[1] https://issues.apache.org/jira/browse/FLINK-16686

Regards,
Alexis.

Am Do., 30. März 2023 um 14:26 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi all,
>
> I also saw a thread on this topic from Clayton Wohl [1] on this topic,
> which I'm including in this discussion thread to avoid that it gets lost.
>
> From my perspective, there's two main ways to get to Java 17:
>
> 1. The Flink community agrees that we upgrade Kryo to a later version,
> which means breaking all checkpoint/savepoint compatibility and releasing a
> Flink 2.0 with Java 17 support added and Java 8 and Flink Scala API support
> dropped. This is probably the quickest way, but would still mean that we
> expose Kryo in the Flink APIs, which is the main reason why we haven't been
> able to upgrade Kryo at all.
> 2. There's a contributor who makes a contribution that bumps Kryo, but
> either a) automagically reads in all old checkpoints/savepoints in using
> Kryo v2 and writes them to new snapshots using Kryo v5 (like is mentioned
> in the Kryo migration guide [2][3] or b) provides an offline tool that
> allows users that are interested in migrating their snapshots manually
> before starting from a newer version. That potentially could prevent the
> need to introduce a new Flink major version. In both scenarios, ideally the
> contributor would also help with avoiding the exposure of Kryo so that we
> will be in a better shape in the future.
>
> It would be good to get the opinion of the community for either of these
> two options, or potentially for another one that I haven't mentioned. If it
> appears that there's an overall agreement on the direction, I would propose
> that a FLIP gets created which describes the entire process.
>
> Looking forward to the thoughts of others, including the Users (therefore
> including the User ML).
>
> Best regards,
>
> Martijn
>
> [1]  https://lists.apache.org/thread/qcw8wy9dv8szxx9bh49nz7jnth22p1v2
> [2] https://lists.apache.org/thread/gv49jfkhmbshxdvzzozh017ntkst3sgq
> [3] https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5
>
> On Sun, Mar 19, 2023 at 8:16 AM Tamir Sagi 
> wrote:
>
>> I agree, there are several options to mitigate the migration from v2 to
>> v5.
>> yet, Oracle roadmap is to end JDK 11 support in September this year.
>>
>>
>>
>> 
>> From: ConradJam 
>> Sent: Thursday, March 16, 2023 4:36 AM
>> To: d...@flink.apache.org 
>> Subject: Re: [Discussion] - Release major Flink version to support JDK 17
>> (LTS)
>>
>> EXTERNAL EMAIL
>>
>>
>>
>> Thanks for your start this discuss
>>
>>
>> I have been tracking this problem for a long time, until I saw a
>> conversation in ISSUSE a few days ago and learned that the Kryo version
>> problem will affect the JDK17 compilation of snapshots [1] FLINK-24998 ,
>>
>> As @cherry said it ruined our whole effort towards JDK17
>>
>> I am in favor of providing an external tool to migrate from Kryo old
>> version checkpoint to the new Kryo new checkpoint at one time (Maybe this
>> tool start in flink 2.0 ?), does this tool currently have any plans or
>> ideas worth discuss
>>
>>
>> I think it should not be difficult to be compatible with JDK11 and JDK17.
>> We should indeed abandon JDK8 in 2.0.0. It is also mentioned in the doc
>> that it is marked as Deprecated [2]
>>
>>
>> Here I add that we need to pay attention to the version of Scala and the
>> version of JDK17
>>
>>
>> [1] FLINK-24998  IGSEGV in Kryo / C2 CompilerThread on Java 17
>> https://issues.apache.org/jira/browse/FLINK-24998
>>
>> [2] FLINK-30501 Update Flink build instruction to deprecate Java 8 instead
>> of requiring Java 11  https://issues.apache.org/jira/browse/FLINK-30501
>>
>> Tamir Sagi  于2023年3月16日周四 00:54写道:
>>
>> > Hey dev community,
>> >
>> > I'm writing this email to kick off a discussion following this epic:
>> > FLINK-15736.
>> >
>> > We are moving towards JDK 17 (LTS) , the only blocker now is Flink which
>> > currently remains on JDK 11 (LTS). Flink does not support JDK 17 yet,
>> with
>> > no timeline,  the reason, based on the aforementioned ticket is the
>> > following tickets
>> >
>> >   1.  FLINK-24998 - SIGSEGV in Kryo / C2 CompilerThread on Java 17<
>> > https://issues.apache.org/jira/browse/FLINK-24998>.
>> >   2.  FLINK-3154 - Update Kryo version from 2.24.0 to latest Kryo LTS
>> > version
>> >
>> > My question is whether it is possible to release a major version 

Re: Watermarks lagging behind events that generate them

2023-03-15 Thread Alexis Sarda-Espinosa
Hi Shammon, thanks for the info. I was hoping the savepoint would include
the watermark, but I'm not sure that would make sense in every scenario.

Regards,
Alexis.

Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY :

> Hi Alexis
>
> In some watermark generators such as BoundedOutOfOrderTimestamps,
> the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask
> is restarted and no event from source is processed.
>
> Best,
> Shammon FY
>
> On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi David, thanks for the answer. One follow-up question: will the
>> watermark be reset to Long.MIN_VALUE every time I restart a job with
>> savepoint?
>>
>> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
>> dander...@apache.org>:
>>
>>> Watermarks always follow the corresponding event(s). I'm not sure why
>>> they were designed that way, but that is how they are implemented.
>>> Windows maintain this contract by emitting all of their results before
>>> forwarding the watermark that triggered the results.
>>>
>>> David
>>>
>>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
>>> >
>>> > Hi Alexis
>>> >
>>> > Do you use both event-time watermark generator and TimerService for
>>> processing time in your job? Maybe you can try using event-time watermark
>>> first.
>>> >
>>> > Best,
>>> > Shammon.FY
>>> >
>>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> I recently ran into a weird issue with a streaming job in Flink
>>> 1.16.1. One of my functions (KeyedProcessFunction) has been using
>>> processing time timers. I now want to execute the same job based on a
>>> historical data dump, so I had to adjust the logic to use event time timers
>>> in that case (and did not use BATCH execution mode). Since my data has a
>>> timestamp field, I implemented a custom WatermarkGenerator that always
>>> emits a watermark with that timestamp in the onEvent callback, and does
>>> nothing in the onPeriodicEmit callback.
>>> >>
>>> >> My problem is that, sometimes, the very first time my function calls
>>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
>>> some false triggers when the first watermark actually arrives.
>>> >>
>>> >> I would have expected that, if WatermarkGenerator.onEvent emits a
>>> watermark, it would be sent before the corresponding event, but maybe this
>>> is not always the case?
>>> >>
>>> >> In case it's relevant, a brief overview of my job's topology:
>>> >>
>>> >> Source1 -> Broadcast
>>> >>
>>> >> Source2 ->
>>> >>   keyBy ->
>>> >>   connect(Broadcast) ->
>>> >>   process ->
>>> >>   filter ->
>>> >>   assignTimestampsAndWatermarks -> // newly added for historical data
>>> >>   keyBy ->
>>> >>   process // function that uses timers
>>> >>
>>> >> Regards,
>>> >> Alexis.
>>>
>>


Re: Watermarks lagging behind events that generate them

2023-03-14 Thread Alexis Sarda-Espinosa
Hi David, thanks for the answer. One follow-up question: will the watermark
be reset to Long.MIN_VALUE every time I restart a job with savepoint?

Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
dander...@apache.org>:

> Watermarks always follow the corresponding event(s). I'm not sure why
> they were designed that way, but that is how they are implemented.
> Windows maintain this contract by emitting all of their results before
> forwarding the watermark that triggered the results.
>
> David
>
> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
> >
> > Hi Alexis
> >
> > Do you use both event-time watermark generator and TimerService for
> processing time in your job? Maybe you can try using event-time watermark
> first.
> >
> > Best,
> > Shammon.FY
> >
> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
> >>
> >> Hello,
> >>
> >> I recently ran into a weird issue with a streaming job in Flink 1.16.1.
> One of my functions (KeyedProcessFunction) has been using processing time
> timers. I now want to execute the same job based on a historical data dump,
> so I had to adjust the logic to use event time timers in that case (and did
> not use BATCH execution mode). Since my data has a timestamp field, I
> implemented a custom WatermarkGenerator that always emits a watermark with
> that timestamp in the onEvent callback, and does nothing in the
> onPeriodicEmit callback.
> >>
> >> My problem is that, sometimes, the very first time my function calls
> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
> some false triggers when the first watermark actually arrives.
> >>
> >> I would have expected that, if WatermarkGenerator.onEvent emits a
> watermark, it would be sent before the corresponding event, but maybe this
> is not always the case?
> >>
> >> In case it's relevant, a brief overview of my job's topology:
> >>
> >> Source1 -> Broadcast
> >>
> >> Source2 ->
> >>   keyBy ->
> >>   connect(Broadcast) ->
> >>   process ->
> >>   filter ->
> >>   assignTimestampsAndWatermarks -> // newly added for historical data
> >>   keyBy ->
> >>   process // function that uses timers
> >>
> >> Regards,
> >> Alexis.
>


Watermarks lagging behind events that generate them

2023-03-10 Thread Alexis Sarda-Espinosa
Hello,

I recently ran into a weird issue with a streaming job in Flink 1.16.1. One
of my functions (KeyedProcessFunction) has been using processing time
timers. I now want to execute the same job based on a historical data dump,
so I had to adjust the logic to use event time timers in that case (and did
*not* use BATCH execution mode). Since my data has a timestamp field, I
implemented a custom WatermarkGenerator that always emits a watermark with
that timestamp in the onEvent callback, and does nothing in the
onPeriodicEmit callback.

My problem is that, sometimes, the very first time my function calls
TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
some false triggers when the first watermark actually arrives.

I would have expected that, if WatermarkGenerator.onEvent emits a
watermark, it would be sent before the corresponding event, but maybe this
is not always the case?

In case it's relevant, a brief overview of my job's topology:

Source1 -> Broadcast

Source2 ->
  keyBy ->
  connect(Broadcast) ->
  process ->
  filter ->
  assignTimestampsAndWatermarks -> // newly added for historical data
  keyBy ->
  process // function that uses timers

Regards,
Alexis.


Re: [EXTERNAL] Re: Secure Azure Credential Configuration

2023-03-06 Thread Alexis Sarda-Espinosa
Hello,

I actually needed this myself so I have validated it. Again, this is if you
want Flink itself to access Azure, and I'm fairly certain you have to use
Java because the plugin's class loader won't have access to the Scala
library's jars.

* You have to build against
https://mvnrepository.com/artifact/org.apache.flink/flink-azure-fs-hadoop/1.16.1
(mark it as provided).
* You should implemente the azurebfs provider for ABFS.
* You can create 1 plugin folder and copy Flink's azure jar plus the one
with your interface implementation.

I can confirm that worked.

Regards,
Alexis.

On Tue, 7 Mar 2023, 06:47 Swathi C,  wrote:

> Hi Ivan,
>
> You can try to setup using MSI so that the flink pods access the storage
> account and you might need to add the podIdentity to the flink pod so that
> it can access it. ( MSI should have the access for the storage account as
> well )
> The pod identity will have the required permissions to access to the
> storage account. These changes might be required along with adding the
> plugins.
> Can you try adding the following to the flink-config ?
>
> fs.azure.account.auth.type: OAuth
> fs.azure.account.oauth2.msi.tenant: 
> fs.azure.account.oauth.provider.type:
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
> fs.azure.account.oauth2.client.id: 
> fs.azure.identity.transformer.service.principal.substitution.list: '*'
> fs.azure.identity.transformer.service.principal.id: 
>
> Regards,
> Swathi C
>
> On Tue, Mar 7, 2023 at 8:53 AM Ivan Webber via user 
> wrote:
>
>> Thanks for the pointers Alexis!
>>
>>
>>
>> Implementing `org.apache.hadoop.fs.azure.KeyProvider` has helped me make
>> progress, but I’m running into a new error:
>>
>> ```
>>
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException:
>> org.tryflink.wrappers.TrafficForecastEnvKeyProviderWrapper specified in
>> config is not a valid KeyProvider class.
>>
>> ```
>>
>>
>>
>> I get this error whether I implement the class in Scala or Java, or use `
>> org.apache.hadoop.fs.azure.KeyProvider` or `
>> org.apache.hadoop.fs.azurebfs.services.KeyProvider `. My best guess is that
>> it’s something to do with not building against the shaded interface which
>> you indicated I should do or possibly different class loaders. To build
>> against the shaded interfaces would I import a package that has them?
>>
>>
>>
>> This is the dependency I added with
>> `org.apache.hadoop.fs.azure.KeyProvider`.
>>
>> ```
>>
>> 
>>
>> org.apache.hadoop
>>
>> hadoop-azure
>>
>> 3.3.2
>>
>> 
>>
>> ```
>>
>>
>>
>> What I’ve learned so far is that this configuration has more to do with
>> configuring Hadoop than Flink as the configuration is forwarded
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/#:~:text=Flink%20forwards%20all%20Flink%20configurations%20with%20a%20key%20prefix%20of%20fs.azure%20to%20the%20Hadoop%20configuration%20of%20the%20filesystem>.
>> Thus, I tried setting the properties to use Azure Managed Identity
>> <https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity>,
>> but got an error [1]. If anyone has gotten that to work I’d be interested
>> in hearing about it.
>>
>>
>>
>> Thanks for the help so far; please, anyone who can give pointers send
>> them.
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Ivan
>>
>>
>>
>>
>>
>> [1] -
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException:
>> No credentials found for account myblob.blob.core.windows.net in the
>> configuration, and its container flink-forecast is not accessible using
>> anonymous credentials. Please check if the container exists first. If it is
>> not publicly available, you have to provide account credentials.
>>
>>
>>
>> *From: *Ivan Webber 
>> *Sent: *Friday, March 3, 2023 10:38 AM
>> *To: *Alexis Sarda-Espinosa 
>> *Cc: *user 
>> *Subject: *Re: [EXTERNAL] Re: Secure Azure Credential Configuration
>>
>>
>>
>> Thanks Alexis,
>>
>>
>>
>> I will be trying that out today. If it works I will share back and try
>> adding it to the docs.
>>
>>
>>
>>
>>
>> *From:* Alexis Sarda-Espinosa 
>> *Sent:* Thursday, March 2, 2023 3:33:03 PM
>> *To:* Ivan Webber 
>> *Cc:* user 
>> *Su

Re: [EXTERNAL] Re: Secure Azure Credential Configuration

2023-03-02 Thread Alexis Sarda-Espinosa
Hi Ivan,

please always include the whole distribution list since answers may help
others as well.

I would also think about implementing your own provider(s), but some things
I know:

- There are 2 different KeyProvider interfaces (which isn't explicitly
documented from what I can tell):
  * org.apache.hadoop.fs.azure.KeyProvider - WASB
  * org.apache.hadoop.fs.azurebfs.services.KeyProvider - ABFS (I think)
- Flink shades the hadoop classes
under org.apache.flink.fs.shaded.hadoop3... so you would need to implement
your providers against the shaded interfaces.
- The documentation for Flink plugins [1] shows an s3 folder with multiple
jars, so I imagine you could add a jar with your key providers to a folder
with the azure-fs jar, but I've never tested this.

However, I believe this whole shading and plugin details are only relevant
if you want Flink to access the azure FS for its checkpoints and/or
savepoints, if you need to access the FS directly in your code, I imagine
you're better off including the relevant hadoop jars in your fat jar
without going through Flink's plugin system.

This is my impression, but maybe someone else can correct me if I'm wrong.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/

Regards,
Alexis.

Am Do., 2. März 2023 um 23:46 Uhr schrieb Ivan Webber <
ivan.web...@microsoft.com>:

> Hello Alexis,
>
>
>
> I was actually thinking I’d use both WASB and ABFS, but I looked at the
> source for EnvironmentVariableKeyProvider and it only reads a single
> specific environment variable where my pipeline actually needs to bring
> together data stored in different blob and ADLS accounts. I couldn’t find
> anything about providing my own KeyProvider but I considered trying it as
> an experiment at one point.
>
>
>
> *From: *Alexis Sarda-Espinosa 
> *Sent: *Thursday, March 2, 2023 2:38 PM
> *To: *Ivan Webber 
> *Cc: *user 
> *Subject: *[EXTERNAL] Re: Secure Azure Credential Configuration
>
>
>
> You don't often get email from sarda.espin...@gmail.com. Learn why this
> is important <https://aka.ms/LearnAboutSenderIdentification>
>
> Hi Ivan,
>
>
>
> Mercy is always free. Are you using WASB or ABFS? I presume it's the
> latter, since that's the one that can't use EnvironmentVariableKeyProvider,
> but just to be sure.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> On Thu, 2 Mar 2023, 23:07 Ivan Webber via user, 
> wrote:
>
> TLDR: I will buy your coffee if you can help me understand to securely
> configure Azure credentials (doc page
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Ffilesystems%2Fazure%2F=05%7C01%7CIvan.Webber%40microsoft.com%7C6d883e0d338a47fccc9b08db1b6ebffd%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638133934840820056%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000%7C%7C%7C=OeNfcC41XyzCtMsZlKT7vd%2FzcKNE8hp4n2MMmKZQUeI%3D=0>
> for reference).
>
>
>
> I am a junior developer tasked with being the first person to learn the
> Apache Flink framework. I know that storing secrets in flink-conf.yaml in a
> container is a bad idea. I’ve tried exposing Azure storage keys as env vars
> and using `config.setString`, but those properties seem to get overridden.
> I plan on using Flink operator, so if you can show me in that context
> that’d be ideal.
>
>
>
> Thanks, and sorry for bothering everyone. I’ve just exhausted myself and
> am hopeful someone will have mercy for me. I really will Venmo you $5 for
> coffee if you want.
>
>
>
> Thanks,
>
>
>
> Ivan
>
>
>
>
>
> Larger code examples:
>
>
>
> Setting dynamic properties before executing the job doesn’t work because
> the values seem to get overridden or never forwarded.
>
> ```
>
> val config = new Configuration()
>
> config.setString("fs.azure.account.key.mystore1.blob.core.windows.net
> <https://nam06.safelinks.protection.outlook.com/?url=http%3A%2F%2Ffs.azure.account.key.mystore1.blob.core.windows.net%2F=05%7C01%7CIvan.Webber%40microsoft.com%7C6d883e0d338a47fccc9b08db1b6ebffd%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638133934840820056%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000%7C%7C%7C=fgakMbTg5vFKCFRMFs1OlyfD0RIDLCMuUYB%2BlFhk7AQ%3D=0>
> ", System.getenv("KEY_1"))
>
> config.setString("fs.azure.account.key.mystore2.blob.core.windows.net
> <https://nam06.safelinks.protection.outlook.com/?url=http%3A%2F%2Ffs.azure.account.key.mystore2.blob.core.windows.net%2F=05%7C01%7CIvan.Webber%40microsoft.com%7C6d883e0d338a47fccc9b08db1b6ebffd%7C72f988bf86f141af

Re: Secure Azure Credential Configuration

2023-03-02 Thread Alexis Sarda-Espinosa
Hi Ivan,

Mercy is always free. Are you using WASB or ABFS? I presume it's the
latter, since that's the one that can't use EnvironmentVariableKeyProvider,
but just to be sure.

Regards,
Alexis.


On Thu, 2 Mar 2023, 23:07 Ivan Webber via user, 
wrote:

> TLDR: I will buy your coffee if you can help me understand to securely
> configure Azure credentials (doc page
> 
> for reference).
>
>
>
> I am a junior developer tasked with being the first person to learn the
> Apache Flink framework. I know that storing secrets in flink-conf.yaml in a
> container is a bad idea. I’ve tried exposing Azure storage keys as env vars
> and using `config.setString`, but those properties seem to get overridden.
> I plan on using Flink operator, so if you can show me in that context
> that’d be ideal.
>
>
>
> Thanks, and sorry for bothering everyone. I’ve just exhausted myself and
> am hopeful someone will have mercy for me. I really will Venmo you $5 for
> coffee if you want.
>
>
>
> Thanks,
>
>
>
> Ivan
>
>
>
>
>
> Larger code examples:
>
>
>
> Setting dynamic properties before executing the job doesn’t work because
> the values seem to get overridden or never forwarded.
>
> ```
>
> val config = new Configuration()
>
> config.setString("fs.azure.account.key.mystore1.blob.core.windows.net",
> System.getenv("KEY_1"))
>
> config.setString("fs.azure.account.key.mystore2.blob.core.windows.net",
> System.getenv("KEY_2"))
>
> config.setString("fs.azure.account.key.mystore3.blob.core.windows.net",
> System.getenv("KEY_3"))
>
> val env = environment.StreamExecutionEnvironment
> .getExecutionEnvironment(config)
>
> ```
>
>
>
> In Flink operator configuration fields can be provided as follows, but
> then I can’t commit the file with a secret inside. Ideally there would be a
> way to reference a secret but the values must be literal strings.
>
> ```
>
> spec:
>
>   flinkConfiguration:
>
> fs.azure.account.key.mystore1.blob.core.windows.net: SECRET_STRING
>
> fs.azure.account.key.mystore2.blob.core.windows.net: SECRET_STRING
>
> fs.azure.account.key.mystore3.blob.core.windows.net: SECRET_STRING
>
> ```
>
>
>
> The last possible solution I can think that I’ll be trying is putting the
> entire flink-conf.yaml into a secret, or having a different container that
> adds secrets to the flink-operator-job.yaml and then does the `kubectl
> create -f flink-operator-job.yaml` (if that’s even possible).
>


Re: Fast and slow stream sources for Interval Join

2023-02-27 Thread Alexis Sarda-Espinosa
Hi Mason,

Very interesting, is it possible to apply both types of alignment? I.e.,
considering watermark skew across splits from within one source & also from
another source?

Regards,
Alexis.

On Tue, 28 Feb 2023, 05:26 Mason Chen,  wrote:

> Hi all,
>
> It's true that the problem can be handled by caching records in state.
> However, there is an alternative using `watermark alignment` with Flink
> 1.15+ [1] which does the desired synchronization that you described while
> reducing the size of state from the former approach.
>
> To use this with two topics of different speeds, you would need to define
> two Kafka sources, each corresponding to a topic. This limitation is
> documented in [1]. This limitation is resolved in Flink 1.17 by split level
> (partition level in the case of Kafka) watermark alignment, so one Kafka
> source reading various topics can align on the partitions of the different
> topics.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_
>
> Best,
> Mason
>
> On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I had this question myself and I've seen it a few times, the answer is
>> always the same, there's currently no official way to handle it without
>> state.
>>
>> Regards,
>> Alexis.
>>
>> On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek,  wrote:
>>
>>> Hi,
>>>
>>> How to handle a case where one of the Kafka topics used for interval
>>> join is slower than the other? (Or a case where one topic lags behind)
>>> Is there a way to stop consuming from the fast topic and wait for the
>>> slow one to catch up? I want to avoid running out of memory (or keeping a
>>> very large state) and I don't want to discard any data from the fast topic
>>> until a watermark from the slow topic allows that.
>>>
>>> Best Regards
>>>
>>


Re: Fast and slow stream sources for Interval Join

2023-02-27 Thread Alexis Sarda-Espinosa
Hello,

I had this question myself and I've seen it a few times, the answer is
always the same, there's currently no official way to handle it without
state.

Regards,
Alexis.

On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek,  wrote:

> Hi,
>
> How to handle a case where one of the Kafka topics used for interval join
> is slower than the other? (Or a case where one topic lags behind)
> Is there a way to stop consuming from the fast topic and wait for the slow
> one to catch up? I want to avoid running out of memory (or keeping a very
> large state) and I don't want to discard any data from the fast topic until
> a watermark from the slow topic allows that.
>
> Best Regards
>


Re: Kubernetes operator's merging strategy for template arrays

2023-02-23 Thread Alexis Sarda-Espinosa
Ah I see, I'll have a look, thanks.

Am Do., 23. Feb. 2023 um 14:21 Uhr schrieb Gyula Fóra :

> If you are interested in helping to review this, here is the relevant
> ticket and the PR I just opened:
>
> https://issues.apache.org/jira/browse/FLINK-30786
> https://github.com/apache/flink-kubernetes-operator/pull/535
>
> Cheers,
> Gyula
>
> On Thu, Feb 23, 2023 at 2:10 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> The current array merging strategy in the operator is basically an
>> overwrite by position yes.
>> I actually have a pending improvement to make this configurable and allow
>> merging arrays by "name" attribute. This is generally more practical for
>> such cases.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Feb 23, 2023 at 1:37 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I noticed that if I set environment variables in both spec.podTemplate &
>>> spec.jobManager.podTemplate for the same container (flink-maincontainer),
>>> the values from the latter selectively overwrite the values from the
>>> former. For example, if I define something like this (omitting metadata
>>> properties):
>>>
>>> spec:
>>>   podTemplate:
>>> spec:
>>>   containers:
>>>   - name: flink-main-container
>>> env:
>>>   - name: FOO
>>> value: BAR
>>>   - name: BAZ
>>> value: BAK
>>>   jobManager:
>>> podTemplate:
>>>   spec:
>>> containers:
>>> - name: flink-main-container
>>>   env:
>>> - name: EXTRA
>>>   value: ENVVAR
>>>
>>> The final spec for the Job Manager Deployment will only contain EXTRA
>>> and BAZ, so FOO is overwritten by EXTRA.
>>>
>>> Is this expected? I am already evaluating the latest release of the
>>> operator (1.4.0).
>>>
>>> Regards,
>>> Alexis.
>>>
>>


Kubernetes operator's merging strategy for template arrays

2023-02-23 Thread Alexis Sarda-Espinosa
Hello,

I noticed that if I set environment variables in both spec.podTemplate &
spec.jobManager.podTemplate for the same container (flink-maincontainer),
the values from the latter selectively overwrite the values from the
former. For example, if I define something like this (omitting metadata
properties):

spec:
  podTemplate:
spec:
  containers:
  - name: flink-main-container
env:
  - name: FOO
value: BAR
  - name: BAZ
value: BAK
  jobManager:
podTemplate:
  spec:
containers:
- name: flink-main-container
  env:
- name: EXTRA
  value: ENVVAR

The final spec for the Job Manager Deployment will only contain EXTRA and
BAZ, so FOO is overwritten by EXTRA.

Is this expected? I am already evaluating the latest release of the
operator (1.4.0).

Regards,
Alexis.


Re: Calculation of UI's maximum non-heap memory

2023-02-21 Thread Alexis Sarda-Espinosa
Very useful, thanks a lot.

Regards,
Alexis.

Am Di., 21. Feb. 2023 um 12:04 Uhr schrieb Weihua Hu :

> Hi Alexis,
>
> The maximum Non-Heap is the sum of the memory pool (which is non-hep) max
> size. There are 3 memory pools(based on jdk11):
> 1. Metaspace,  we can control the size with JVM parameter -XX:MaxMetaspaceSize
> or Flink configuration: jobmanager.memory.jvm-metaspace.size. For your
> job, this pool size is 150m
> 2. Compressed Class Space, this is controlled by JVM parameter
> -XX:CompressedClassSpaceSize. The default value is 1G or MetaspaceSize - 2
> * InitialBootClassLoaderMetaspaceSize(default is 4194304). For your job,
> this pool size is 142m
> 3. CodeCache(codeHeap profiled nmethods/non-profiled
> nmethods/non-nmethods), this is controlled by JVM parameter
> -XX:ReservedCodeCacheSize. The default value is 240m
>
> So, the maximum non-heap is 150+142+240 = 532m.
>
>
> Best,
> Weihua
>
>
> On Tue, Feb 21, 2023 at 2:33 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Weihua,
>>
>> Thanks for your response, I am familiar with those calculations, the one
>> I don't understand is the Maximum Non-Heap value.
>>
>> Regards,
>> Alexis.
>>
>> On Tue, 21 Feb 2023, 04:45 Weihua Hu,  wrote:
>>
>>> Hi, Alexis
>>>
>>> 1. With those configuration, Flink will set JVM parameters -Xms and -Xmx
>>> to 673185792(642m),-XX:MaxDirectMemorySize to 
>>> 67108864(64m),-XX:MaxMetaspaceSize
>>> to 157286400(150m), you can find more information from [1]
>>> 2. As the hint in Flink UI: "The maximum heap displayed might differ
>>> from the configured values depending on the used GC algorithm for this
>>> process.", This[2] shows how JVM calculate the max heap memory from
>>> configured -Xms/-Xmx
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/
>>> [2]
>>> https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have configured a job manager with the following settings (Flink
>>>> 1.16.1):
>>>>
>>>> jobmanager.memory.process.size: 1024m
>>>> jobmanager.memory.jvm-metaspace.size: 150m
>>>> jobmanager.memory.off-heap.size: 64m
>>>> jobmanager.memory.jvm-overhead.min: 168m
>>>> jobmanager.memory.jvm-overhead.max: 168m
>>>> jobmanager.memory.enable-jvm-direct-memory-limit: "true"
>>>>
>>>> However, when I look at the job manager dashboard in the UI, I see that
>>>> the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify
>>>> how this value is calculated?
>>>>
>>>> In case it's relevant, the effective configuration for JVM Heap is
>>>> reported as 642 MB, with the reported maximum being 621 MB.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Calculation of UI's maximum non-heap memory

2023-02-20 Thread Alexis Sarda-Espinosa
Hi Weihua,

Thanks for your response, I am familiar with those calculations, the one I
don't understand is the Maximum Non-Heap value.

Regards,
Alexis.

On Tue, 21 Feb 2023, 04:45 Weihua Hu,  wrote:

> Hi, Alexis
>
> 1. With those configuration, Flink will set JVM parameters -Xms and -Xmx
> to 673185792(642m),-XX:MaxDirectMemorySize to 
> 67108864(64m),-XX:MaxMetaspaceSize
> to 157286400(150m), you can find more information from [1]
> 2. As the hint in Flink UI: "The maximum heap displayed might differ from
> the configured values depending on the used GC algorithm for this
> process.", This[2] shows how JVM calculate the max heap memory from
> configured -Xms/-Xmx
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/
> [2]
> https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method
>
> Best,
> Weihua
>
>
> On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have configured a job manager with the following settings (Flink
>> 1.16.1):
>>
>> jobmanager.memory.process.size: 1024m
>> jobmanager.memory.jvm-metaspace.size: 150m
>> jobmanager.memory.off-heap.size: 64m
>> jobmanager.memory.jvm-overhead.min: 168m
>> jobmanager.memory.jvm-overhead.max: 168m
>> jobmanager.memory.enable-jvm-direct-memory-limit: "true"
>>
>> However, when I look at the job manager dashboard in the UI, I see that
>> the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify
>> how this value is calculated?
>>
>> In case it's relevant, the effective configuration for JVM Heap is
>> reported as 642 MB, with the reported maximum being 621 MB.
>>
>> Regards,
>> Alexis.
>>
>>


Calculation of UI's maximum non-heap memory

2023-02-20 Thread Alexis Sarda-Espinosa
Hello,

I have configured a job manager with the following settings (Flink 1.16.1):

jobmanager.memory.process.size: 1024m
jobmanager.memory.jvm-metaspace.size: 150m
jobmanager.memory.off-heap.size: 64m
jobmanager.memory.jvm-overhead.min: 168m
jobmanager.memory.jvm-overhead.max: 168m
jobmanager.memory.enable-jvm-direct-memory-limit: "true"

However, when I look at the job manager dashboard in the UI, I see that the
value of Non-Heap Maximum is reported as 532 MB. Could someone clarify how
this value is calculated?

In case it's relevant, the effective configuration for JVM Heap is reported
as 642 MB, with the reported maximum being 621 MB.

Regards,
Alexis.


Re: Could savepoints contain in-flight data?

2023-02-13 Thread Alexis Sarda-Espinosa
Hi Hang,

Thanks for the confirmation. One follow-up question with a somewhat
convoluted scenario:

   1. An unaligned checkpoint is created.
   2. I stop the job *without* savepoint.
   3. I want to start a modified job from the checkpoint, but I changed one
   of the operator's uids.

If the operator whose uid changed had in-flight data as part of the
checkpoint, it will lose said data after starting, right?

I imagine this is not good practice, but it's just a hypothetical scenario
I wanted to understand better.

Regards,
Alexis.


Am Mo., 13. Feb. 2023 um 12:33 Uhr schrieb Hang Ruan :

> ps: the savepoint will also not contain in-flight data.
>
> Best,
> Hang
>
> Hang Ruan  于2023年2月13日周一 19:31写道:
>
>> Hi Alexis,
>>
>> No, aligned checkpoint will not contain the in-flight. Aligned checkpoint
>> makes sure that the data before the barrier has been processed and there is
>> no need to store in-flight data for one checkpoint.
>>
>> I think these documents[1][2] will help you to understand it.
>>
>>
>> Best,
>> Hang
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/concepts/stateful-stream-processing/#checkpointing
>>
>> Alexis Sarda-Espinosa  于2023年2月11日周六 06:00写道:
>>
>>> Hello,
>>>
>>> One feature of unaligned checkpoints is that the checkpoint barriers can
>>> overtake in-flight data, so the buffers are persisted as part of the state.
>>>
>>> The documentation for savepoints doesn't mention anything explicitly, so
>>> just to be sure, will savepoints always wait for in-flight data to be
>>> processed before they are completed, or could they also persist buffers in
>>> certain situations (e.g. when there's backpressure)?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Could savepoints contain in-flight data?

2023-02-10 Thread Alexis Sarda-Espinosa
Hello,

One feature of unaligned checkpoints is that the checkpoint barriers can
overtake in-flight data, so the buffers are persisted as part of the state.

The documentation for savepoints doesn't mention anything explicitly, so
just to be sure, will savepoints always wait for in-flight data to be
processed before they are completed, or could they also persist buffers in
certain situations (e.g. when there's backpressure)?

Regards,
Alexis.


Re: Backpressure due to busy sub-tasks

2022-12-16 Thread Alexis Sarda-Espinosa
Hi Martijn,

yes, that's what I meant, the throughput in the process function(s) didn't
change, so even if they were busy 100% of the time with parallelism=2, they
were processing data quickly enough.

Regards,
Alexis.

Am Fr., 16. Dez. 2022 um 14:20 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi,
>
> Backpressure implies that it's actually a later operator that is busy. So
> in this case, that would be your process function that can't handle the
> incoming load from your Kafka source.
>
> Best regards,
>
> Martijn
>
> On Tue, Dec 13, 2022 at 7:46 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a Kafka source (the new one) in Flink 1.15 that's followed by a
>> process function with parallelism=2. Some days, I see long periods of
>> backpressure in the source. During those times, the pool-usage metrics of
>> all tasks stay between 0 and 1%, but the process function appears 100% busy.
>>
>> To try to avoid backpressure, I increased parallelism to 3. It seems to
>> help, and busy-time decreased to around 80%, but something that caught my
>> attention is that throughput remained unchanged. Concretely, if X is the
>> number of events being written to the Kafka topic every second, each
>> instance of the process function receives roughly X/2 events/s with
>> parallelism=2, and X/3 with parallelism=3.
>>
>> I'm wondering a couple of things.
>>
>> 1. Is it possible that backpressure in this case is essentially a "false
>> positive" because the function is busy 100% of the time even though it's
>> processing enough data?
>> 2. Does Flink expose any way to tune this type of backpressure mechanism?
>>
>> Regards,
>> Alexis.
>>
>


Backpressure due to busy sub-tasks

2022-12-13 Thread Alexis Sarda-Espinosa
Hello,

I have a Kafka source (the new one) in Flink 1.15 that's followed by a
process function with parallelism=2. Some days, I see long periods of
backpressure in the source. During those times, the pool-usage metrics of
all tasks stay between 0 and 1%, but the process function appears 100% busy.

To try to avoid backpressure, I increased parallelism to 3. It seems to
help, and busy-time decreased to around 80%, but something that caught my
attention is that throughput remained unchanged. Concretely, if X is the
number of events being written to the Kafka topic every second, each
instance of the process function receives roughly X/2 events/s with
parallelism=2, and X/3 with parallelism=3.

I'm wondering a couple of things.

1. Is it possible that backpressure in this case is essentially a "false
positive" because the function is busy 100% of the time even though it's
processing enough data?
2. Does Flink expose any way to tune this type of backpressure mechanism?

Regards,
Alexis.


Re: Clarification on checkpoint cleanup with RETAIN_ON_CANCELLATION

2022-12-12 Thread Alexis Sarda-Espinosa
Hi Hangxiang,

after some more digging, I think the job ID is maintained not because of
Flink HA, but because of the Kubernetes operator. It seems to me that
"savepoint" upgrade mode should ideally alter job ID when starting from the
savepoint, but I'm not sure.

Regards,
Alexis.

Am Mo., 12. Dez. 2022 um 10:31 Uhr schrieb Hangxiang Yu :

> Hi Alexis.
> IIUC, by default, the job id of the new job should be different if you
> restore from a stopped job ? Whether to cleanup is related to the savepoint
> restore mode.
> Just in the case of failover, the job id should not change, and everything
> in the checkpoint dir will be claimed as you said.
>
> > And a related question for a slightly different scenario, if I
> use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
> stop-job-with-savepoint, does that trigger checkpoint deletion?
> In this case, the checkpoint will be cleaned and not retained and the
> savepoint will remain. So you still could use savepoint to restore.
>
> On Mon, Dec 5, 2022 at 6:33 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a doubt about a very particular scenario with this configuration:
>>
>> - Flink HA enabled (Kubernetes).
>> - ExternalizedCheckpointCleanup set to RETAIN_ON_CANCELLATION.
>> - Savepoint restore mode left as default NO_CLAIM.
>>
>> During an upgrade, a stop-job-with-savepoint is triggered, and then that
>> savepoint is used to start the upgraded job. Based on what I see, since HA
>> is enabled, the job ID doesn't change. Additionally, I understand the first
>> checkpoint after restoration will be a full one so that there's no
>> dependency on the used savepoint. However, since the job ID didn't change,
>> the new checkpoint still shares a path with "older" checkpoints, e.g.
>> /.../job_id/chk-1234.
>>
>> In this case, does this mean everything under /.../job_id/ *except*
>> shared/, taskowned/, and any chk-*/ folder whose id is smaller than 1234
>> could be deleted? I imagine even some files under shared/ could be deleted
>> as well, although that might be harder to identify.
>>
>> And a related question for a slightly different scenario, if I
>> use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
>> stop-job-with-savepoint, does that trigger checkpoint deletion?
>>
>> Regards,
>> Alexis.
>>
>
>
> --
> Best,
> Hangxiang.
>


Re: Deterministic co-results

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Salva,

Just to give you further thoughts from another user, I think the "temporal
join" semantics are very critical in this use case, and what you implement
for that may not easily generalize to other cases. Because of that, I'm not
sure if you can really define best practices that apply in general.
Additionally, you also have to take idleness into account, given that using
event-time could leave you in a "frozen" state if you're not receiving
events continuously.

I also doubt you can accurately estimate out-of-orderness in this scenario
due to the particularities of Flink's network stack [1]. Even if you only
have 2 sources and immediately connect them together, parallelism and the
resulting shuffles can change from one execution to the other even if you
don't change the logic at all, because scheduling is also non-deterministic
and the "distribution" of events across different parallel instances of
your sources could vary a lot as well.

I think that others will tell you that you indeed need to find a way to
buffer events for a while, I got the same advice in the past. Focusing very
specifically on what you described (streams for data & control events +
event-time + temporal join), but maybe also assuming you can manage
watermarks in a way that handles idleness without simply freezing the
stream, I once tried a custom operator (not function) to force the
watermarks of 1 stream to wait for the other one -
see PrioritizedWatermarkStreamIntervalJoinOperator in [2]. This still uses
the idea of buffering, it just moves that responsibility to the operator
that's already handling "window data" for the join. Also, it extends an
internal class, so it's very much unofficial, and it's probably too
specific to my use case, but maybe it gives you other ideas to consider.

And one last detail to further exemplify complexity here: when I was
testing my custom operator with event-time simulations in my IDE, I
initially didn't think about the fact that a watermark with Long.MAX_VALUE
is sent at the end of the simulation, which was another source of
non-determinism because sometimes the control stream was processed entirely
(including the max watermark) before a single event from the data stream
went through, which meant that all events from the data stream were
considered late arrivals and silently dropped.

[1] https://flink.apache.org/2019/06/05/flink-network-stack.html
[2] https://lists.apache.org/thread/n79tsqd5y474n5th1wdhkrh7bvlts8bs

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 15:20 Uhr schrieb Salva Alcántara <
salcantara...@gmail.com>:

> Just for adding some extra references:
>
> [5]
> https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
> [6]
> https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
> [7]
> https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260
>
> Salva
>
> On 2022/12/07 18:52:42 Salva Alcántara wrote:
> > It's well-known that Flink does not provide any guarantees on the order
> in
> > which a CoProcessFunction (or one of its multiple variations) processes
> its
> > two inputs [1]. I wonder then what is the current best
> practice/recommended
> > approach for cases where one needs deterministic results in presence of:
> >
> > 1. A control stream
> > 2. An event/data stream
> >
> > Let's consider event-time semantics; both streams have timestamps, and we
> > want to implement "temporal join" semantics where the input events are
> > controlled based on the latest control signals received before them,
> i.e.,
> > the ones "active" when the events happened. For simplicity, let's assume
> > that all events are received in order, so that the only source of
> > non-determinism is the one introduced by the CoProcessFunction itself.
> >
> > I'm considering the following options:
> >
> > 1. Buffer events inside the CoProcessFunction for some time, while saving
> > all the control signals in state (indexed by time)
> > 2. Same as before but doing the pre-buffering of the event/data streams
> > before the CoProcessFunction
> > 3. Similar as before but considering both streams together by
> multiplexing
> > them into one heterogeneous stream which would be pre-sorted in order to
> > guarantee the global ordering of the events from the two different
> sources.
> > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
> > ProcessFunction[Either[IN1, IN2], OUT] which by construction will process
> > the data in order and hence produce deterministic results
> >
> > Essentially, all the strategies amount to introducing a "minimum amount
> of
> > delay" to guarantee the deterministic processing, which brings me to the
> > following question:
> >
> > * How to get an estimate for the out-of-order-ness bound that a
> > CoProcessFunction can introduce? Is that even possible in the first
> place?
> > I guess this 

Re: Cleanup for high-availability.storageDir

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Matthias,

I think you didn't include the mailing list in your response.

According to my experiments, using last-state means the operator simply
deletes the Flink pods, and I believe that doesn't count as Cancelled, so
the artifacts for blobs and submitted job graphs are not cleaned up. I
imagine the same logic Gyula mentioned before applies, namely keep the
latest one and clean the older ones.

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> I see, I confused the Flink-internal recovery with what the Flink
> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do
> an upgrade of your job, the operator will cancel the Flink job (I'm
> assuming now that you use Flink's Application mode rather than Session
> mode). The operator cancelled your job and shuts down the cluster.
> Checkpoints are retained and, therefore, can be used as the so-called "last
> state" when redeploying your job using the same Job ID. In that case, the
> corresponding jobGraph and other BLOBs should be cleaned up by Flink
> itself. The checkpoint files are retained, i.e. survive the Flink cluster
> shutdown.
>
> When redeploying the Flink cluster with the (updated) job, a new JobGraph
> file is created by Flink internally. BLOBs are recreated as well. New
> checkpoints are going to be created and old ones (that are not needed
> anymore) are cleaned up.
>
> Just to recap what I said before (to make it more explicit to
> differentiate what the k8s operator does and what Flink does internally):
> Removing the artifacts you were talking about in your previous post would
> harm Flink's internal recovery mechanism. That's probably not what you want.
>
> @Gyula: Please correct me if I misunderstood something here.
>
> I hope that helped.
> Matthias
>
> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> I see, thanks for the details.
>>
>> I do mean replacing the job without stopping it terminally. Specifically,
>> I mean updating the container image with one that contains an updated job
>> jar. Naturally, the new version must not break state compatibility, but as
>> long as that is fulfilled, the job should be able to use the last
>> checkpoint as starting point. It's my understanding that this is how the
>> Kubernetes operator's "last-state" upgrade mode works [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
>> matthias.p...@aiven.io>:
>>
>>> > - job_name/submittedJobGraphX
>>> submittedJobGraph* is the persisted JobGraph that would be picked up in
>>> case of a failover. Deleting this file would result in Flink's failure
>>> recovery not working properly anymore if the job is still executed but
>>> needs to be restarted because the actual job definition is gone.
>>>
>>> > completedCheckpointXYZ
>>> This is the persisted CompletedCheckpoint with a reference to the actual
>>> Checkpoint directory. Deleting this file would be problematic if the state
>>> recovery relies in some way on this specific checkpoint. The HA data relies
>>> on this file to be present. Failover only fails if there's no newer
>>> checkpoint or the HA data still refers to this checkpoint in some way.
>>>
>>> > - job_name/blob/job_uuid/blob_...
>>> Artifacts of the BlobServer containing runtime artifacts of the jobs
>>> (e.g. logs, libraries, ...)
>>>
>>> In general, you don't want to clean HA artifacts if the job hasn't
>>> reached a terminal state, yet, as it harms Flink's ability to recover the
>>> job. Additionally, these files are connected with the HA backend, i.e. the
>>> file path is stored in the HA backend. Removing the artifacts will likely
>>> result in metadata becoming invalid.
>>>
>>> What do you mean with "testing updates *without* savepoints"? Are you
>>> referring to replacing the job's business logic without stopping the job?
>>>
>>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Then the explanation is likely that the job has not reached a terminal
>>>> state. I was testing updates *without* savepoints (but with HA), so I guess
>>>> that never triggers automatic cleanup.
&

Re: Cleanup for high-availability.storageDir

2022-12-07 Thread Alexis Sarda-Espinosa
I see, thanks for the details.

I do mean replacing the job without stopping it terminally. Specifically, I
mean updating the container image with one that contains an updated job
jar. Naturally, the new version must not break state compatibility, but as
long as that is fulfilled, the job should be able to use the last
checkpoint as starting point. It's my understanding that this is how the
Kubernetes operator's "last-state" upgrade mode works [1].

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades

Regards,
Alexis.

Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> > - job_name/submittedJobGraphX
> submittedJobGraph* is the persisted JobGraph that would be picked up in
> case of a failover. Deleting this file would result in Flink's failure
> recovery not working properly anymore if the job is still executed but
> needs to be restarted because the actual job definition is gone.
>
> > completedCheckpointXYZ
> This is the persisted CompletedCheckpoint with a reference to the actual
> Checkpoint directory. Deleting this file would be problematic if the state
> recovery relies in some way on this specific checkpoint. The HA data relies
> on this file to be present. Failover only fails if there's no newer
> checkpoint or the HA data still refers to this checkpoint in some way.
>
> > - job_name/blob/job_uuid/blob_...
> Artifacts of the BlobServer containing runtime artifacts of the jobs (e.g.
> logs, libraries, ...)
>
> In general, you don't want to clean HA artifacts if the job hasn't reached
> a terminal state, yet, as it harms Flink's ability to recover the job.
> Additionally, these files are connected with the HA backend, i.e. the file
> path is stored in the HA backend. Removing the artifacts will likely result
> in metadata becoming invalid.
>
> What do you mean with "testing updates *without* savepoints"? Are you
> referring to replacing the job's business logic without stopping the job?
>
> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Matthias,
>>
>> Then the explanation is likely that the job has not reached a terminal
>> state. I was testing updates *without* savepoints (but with HA), so I guess
>> that never triggers automatic cleanup.
>>
>> Since, in my case, the job will theoretically never reach a terminal
>> state with this configuration, would it cause issues if I clean the
>> artifacts manually?
>>
>> *And for completeness, I also see an artifact called
>> completedCheckpointXYZ which is updated over time, and I imagine that
>> should never be removed.
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl <
>> matthias.p...@aiven.io>:
>>
>>> Flink should already take care of cleaning the artifacts you mentioned.
>>> Flink 1.15+ even includes retries if something went wrong. There are still
>>> a few bugs that need to be fixed (e.g. FLINK-27355 [1]). Checkpoint HA data
>>> is not properly cleaned up, yet, which is covered by FLIP-270 [2].
>>>
>>> It would be interesting to know why these artifacts haven't been deleted
>>> assuming that the corresponding job is actually in a final state (e.g.
>>> FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry file for
>>> that specific job available in the folder Gyula was referring to in the
>>> linked documentation. At least for the JobGraph files, it's likely that you
>>> have additional metadata still stored in your HA backend (that refers to
>>> the files). That would be something you might want to clean up as well, if
>>> you want to do a proper cleanup. But still, it would be good to understand
>>> why these files are not cleaned up by Flink.
>>>
>>> Best,
>>> Matthias
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-27355
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>>>
>>> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> One concrete question, under the HA folder I also see these sample
>>>> entries:
>>>>
>>>> - job_name/blob/job_uuid/blob_...
>>>> - job_name/submittedJobGraphX
>>>> - job_name/submittedJobGraphY
>>>>
>>>> Is it safe to clean these up when the job is in a healthy state?
>>>>
>>

Re: Cleanup for high-availability.storageDir

2022-12-07 Thread Alexis Sarda-Espinosa
Hi Matthias,

Then the explanation is likely that the job has not reached a terminal
state. I was testing updates *without* savepoints (but with HA), so I guess
that never triggers automatic cleanup.

Since, in my case, the job will theoretically never reach a terminal state
with this configuration, would it cause issues if I clean the artifacts
manually?

*And for completeness, I also see an artifact called completedCheckpointXYZ
which is updated over time, and I imagine that should never be removed.

Regards,
Alexis.

Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> Flink should already take care of cleaning the artifacts you mentioned.
> Flink 1.15+ even includes retries if something went wrong. There are still
> a few bugs that need to be fixed (e.g. FLINK-27355 [1]). Checkpoint HA data
> is not properly cleaned up, yet, which is covered by FLIP-270 [2].
>
> It would be interesting to know why these artifacts haven't been deleted
> assuming that the corresponding job is actually in a final state (e.g.
> FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry file for
> that specific job available in the folder Gyula was referring to in the
> linked documentation. At least for the JobGraph files, it's likely that you
> have additional metadata still stored in your HA backend (that refers to
> the files). That would be something you might want to clean up as well, if
> you want to do a proper cleanup. But still, it would be good to understand
> why these files are not cleaned up by Flink.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-27355
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>
> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> One concrete question, under the HA folder I also see these sample
>> entries:
>>
>> - job_name/blob/job_uuid/blob_...
>> - job_name/submittedJobGraphX
>> - job_name/submittedJobGraphY
>>
>> Is it safe to clean these up when the job is in a healthy state?
>>
>> Regards,
>> Alexis.
>>
>> Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com>:
>>
>>> Hi Gyula,
>>>
>>> that certainly helps, but to set up automatic cleanup (in my case, of
>>> azure blob storage), the ideal option would be to be able to set a simple
>>> policy that deletes blobs that haven't been updated in some time, but that
>>> would assume that anything that's actually relevant for the latest state is
>>> "touched" by the JM on every checkpoint, and since I also see blobs
>>> referencing "submitted job graphs", I imagine that might not be a safe
>>> assumption.
>>>
>>> I understand the life cycle of those blobs isn't directly managed by the
>>> operator, but in that regard it could make things more cumbersome.
>>>
>>> Ideally, Flink itself would guarantee this sort of allowable TTL for HA
>>> files, but I'm sure that's not trivial.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> On Mon, 5 Dec 2022, 19:19 Gyula Fóra,  wrote:
>>>
>>>> Hi!
>>>>
>>>> There are some files that are not cleaned up over time in the HA dir
>>>> that need to be cleaned up by the user:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>>>>
>>>>
>>>> Hope this helps
>>>> Gyula
>>>>
>>>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I see the number of entries in the directory configured for HA
>>>>> increases over time, particularly in the context of job upgrades in a
>>>>> Kubernetes environment managed by the operator. Would it be safe to assume
>>>>> that any files that haven't been updated in a while can be deleted?
>>>>> Assuming the checkpointing interval is much smaller than the period used 
>>>>> to
>>>>> determine if files are too old.
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>


Re: Cleanup for high-availability.storageDir

2022-12-06 Thread Alexis Sarda-Espinosa
One concrete question, under the HA folder I also see these sample entries:

- job_name/blob/job_uuid/blob_...
- job_name/submittedJobGraphX
- job_name/submittedJobGraphY

Is it safe to clean these up when the job is in a healthy state?

Regards,
Alexis.

Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Gyula,
>
> that certainly helps, but to set up automatic cleanup (in my case, of
> azure blob storage), the ideal option would be to be able to set a simple
> policy that deletes blobs that haven't been updated in some time, but that
> would assume that anything that's actually relevant for the latest state is
> "touched" by the JM on every checkpoint, and since I also see blobs
> referencing "submitted job graphs", I imagine that might not be a safe
> assumption.
>
> I understand the life cycle of those blobs isn't directly managed by the
> operator, but in that regard it could make things more cumbersome.
>
> Ideally, Flink itself would guarantee this sort of allowable TTL for HA
> files, but I'm sure that's not trivial.
>
> Regards,
> Alexis.
>
> On Mon, 5 Dec 2022, 19:19 Gyula Fóra,  wrote:
>
>> Hi!
>>
>> There are some files that are not cleaned up over time in the HA dir that
>> need to be cleaned up by the user:
>>
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>>
>>
>> Hope this helps
>> Gyula
>>
>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I see the number of entries in the directory configured for HA increases
>>> over time, particularly in the context of job upgrades in a Kubernetes
>>> environment managed by the operator. Would it be safe to assume that any
>>> files that haven't been updated in a while can be deleted? Assuming the
>>> checkpointing interval is much smaller than the period used to determine if
>>> files are too old.
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Cleanup for high-availability.storageDir

2022-12-05 Thread Alexis Sarda-Espinosa
Hi Gyula,

that certainly helps, but to set up automatic cleanup (in my case, of azure
blob storage), the ideal option would be to be able to set a simple policy
that deletes blobs that haven't been updated in some time, but that would
assume that anything that's actually relevant for the latest state is
"touched" by the JM on every checkpoint, and since I also see blobs
referencing "submitted job graphs", I imagine that might not be a safe
assumption.

I understand the life cycle of those blobs isn't directly managed by the
operator, but in that regard it could make things more cumbersome.

Ideally, Flink itself would guarantee this sort of allowable TTL for HA
files, but I'm sure that's not trivial.

Regards,
Alexis.

On Mon, 5 Dec 2022, 19:19 Gyula Fóra,  wrote:

> Hi!
>
> There are some files that are not cleaned up over time in the HA dir that
> need to be cleaned up by the user:
>
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>
>
> Hope this helps
> Gyula
>
> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I see the number of entries in the directory configured for HA increases
>> over time, particularly in the context of job upgrades in a Kubernetes
>> environment managed by the operator. Would it be safe to assume that any
>> files that haven't been updated in a while can be deleted? Assuming the
>> checkpointing interval is much smaller than the period used to determine if
>> files are too old.
>>
>> Regards,
>> Alexis.
>>
>>


Cleanup for high-availability.storageDir

2022-12-05 Thread Alexis Sarda-Espinosa
Hello,

I see the number of entries in the directory configured for HA increases
over time, particularly in the context of job upgrades in a Kubernetes
environment managed by the operator. Would it be safe to assume that any
files that haven't been updated in a while can be deleted? Assuming the
checkpointing interval is much smaller than the period used to determine if
files are too old.

Regards,
Alexis.


Clarification on checkpoint cleanup with RETAIN_ON_CANCELLATION

2022-12-05 Thread Alexis Sarda-Espinosa
Hello,

I have a doubt about a very particular scenario with this configuration:

- Flink HA enabled (Kubernetes).
- ExternalizedCheckpointCleanup set to RETAIN_ON_CANCELLATION.
- Savepoint restore mode left as default NO_CLAIM.

During an upgrade, a stop-job-with-savepoint is triggered, and then that
savepoint is used to start the upgraded job. Based on what I see, since HA
is enabled, the job ID doesn't change. Additionally, I understand the first
checkpoint after restoration will be a full one so that there's no
dependency on the used savepoint. However, since the job ID didn't change,
the new checkpoint still shares a path with "older" checkpoints, e.g.
/.../job_id/chk-1234.

In this case, does this mean everything under /.../job_id/ *except*
shared/, taskowned/, and any chk-*/ folder whose id is smaller than 1234
could be deleted? I imagine even some files under shared/ could be deleted
as well, although that might be harder to identify.

And a related question for a slightly different scenario, if I
use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
stop-job-with-savepoint, does that trigger checkpoint deletion?

Regards,
Alexis.


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Alexis Sarda-Espinosa
Just to be clear, I don't think the operator must have special logic to
find out if a savepoint was used as base for an incremental checkpoint,
however, the operator logic might want to completely disable savepoint
cleanup for a deployment if the user enabled CLAIM mode for it. At least
that sounds like the safer option to me.

Regards,
Alexis.

On Tue, 29 Nov 2022, 10:31 Gyula Fóra,  wrote:

> The operator might call dispose on an old savepoint that’s true, but I am
> not sure if the dispose api call would actually corrupt it.
>
> Gyula
>
> On Tue, 29 Nov 2022 at 09:28, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Hangxiang,
>>
>> but, if I understand correctly, setting restore mode to CLAIM means that
>> the job might create a new incremental checkpoint based on the savepoint,
>> right? And if the operator then decides to clean up the savepoint, the
>> checkpoint would be corrupted, no?
>>
>> Regards,
>> Alexis.
>>
>> Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu <
>> master...@gmail.com>:
>>
>>> Hi, Alexis.
>>> IIUC, There is no conflict between savepoint history and restore mode.
>>> Restore mode cares about whether/how we manage the savepoint of old job.
>>> Savepoint management in operator only cares about savepoint history of
>>> new job.
>>> In other words, savepoint cleanup should not clean the savepoint from
>>> the old job which should only be controlled by restore mode.
>>> So I think you could also set restore mode according to your needs.
>>>
>>>
>>> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Is there a recommended configuration for the restore mode of jobs
>>>> managed by the operator?
>>>>
>>>> Since the documentation states that the operator keeps a savepoint
>>>> history to perform cleanup, I imagine restore mode should always be
>>>> NO_CLAIM, but I just want to confirm.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>
>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Alexis Sarda-Espinosa
Hi Hangxiang,

but, if I understand correctly, setting restore mode to CLAIM means that
the job might create a new incremental checkpoint based on the savepoint,
right? And if the operator then decides to clean up the savepoint, the
checkpoint would be corrupted, no?

Regards,
Alexis.

Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu :

> Hi, Alexis.
> IIUC, There is no conflict between savepoint history and restore mode.
> Restore mode cares about whether/how we manage the savepoint of old job.
> Savepoint management in operator only cares about savepoint history of new
> job.
> In other words, savepoint cleanup should not clean the savepoint from the
> old job which should only be controlled by restore mode.
> So I think you could also set restore mode according to your needs.
>
>
> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Is there a recommended configuration for the restore mode of jobs managed
>> by the operator?
>>
>> Since the documentation states that the operator keeps a savepoint
>> history to perform cleanup, I imagine restore mode should always be
>> NO_CLAIM, but I just want to confirm.
>>
>> Regards,
>> Alexis.
>>
>
>
> --
> Best,
> Hangxiang.
>


Kubernetes operator and jobs with last-state upgrades

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

I am doing some tests with the operator and, if I'm not mistaken, using
last-state upgrade means that, when something is changed in the CR, no
savepoint is taken and the pods are simply terminated. Is that a
requirement from Flink HA? I would have thought last-state would still use
savepoints for upgrade if the current status is stable.

Regards,
Alexis.


Re: Owner reference with the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Ah I see, cool, thanks.

Regards,
Alexis.

Am Mi., 16. Nov. 2022 um 15:50 Uhr schrieb Gyula Fóra :

> This has been changed in the current snapshot release:
> https://issues.apache.org/jira/browse/FLINK-28979
>
> It will be part of the 1.3.0 version.
>
> On Wed, Nov 16, 2022 at 3:32 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Is there a particular reason the operator doesn't set owner references
>> for the Deployments it creates as a result of a FlinkDeployment CR? This
>> makes tracking in the Argo CD UI impossible. (To be clear, I mean a
>> reference from the Deployment to the FlinkDeployment).
>>
>> Regards,
>> Alexis.
>>
>>


Savepoint restore mode for the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

Is there a recommended configuration for the restore mode of jobs managed
by the operator?

Since the documentation states that the operator keeps a savepoint history
to perform cleanup, I imagine restore mode should always be NO_CLAIM, but I
just want to confirm.

Regards,
Alexis.


Owner reference with the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

Is there a particular reason the operator doesn't set owner references for
the Deployments it creates as a result of a FlinkDeployment CR? This makes
tracking in the Argo CD UI impossible. (To be clear, I mean a reference
from the Deployment to the FlinkDeployment).

Regards,
Alexis.


Broadcast state and job restarts

2022-10-27 Thread Alexis Sarda-Espinosa
Hello,

The documentation for broadcast state specifies that it is always kept in
memory. My assumptions based on this statement are:

1. If a job restarts in the same Flink cluster (i.e. using a restart
strategy), the tasks' attempt number increases and the broadcast state is
restored since it's not lost from memory.
2. If the whole Flink cluster is restarted with a savepoint, broadcast
state will not be restored and I need to write my application with this in
mind.

Are these correct?

Regards,
Alexis.


Broadcast state restoration for BroadcastProcessFunction

2022-10-14 Thread Alexis Sarda-Espinosa
Hello,

I wrote a test for a broadcast function to check how it handles broadcast
state during retries [1] (the gist only shows a subset of the test in
Kotlin, but it's hopefully understandable). The test will not pass unless
my function also implements CheckpointedFunction, although those
interface's methods' implementations can be empty - the state is empty in
this case, even though its descriptor is registered with the harness.

Is this requirement specific to the test harness API?
Otherwise BaseBroadcastProcessFunction should implement
CheckpointedFunction, maybe with empty default methods, no?

[1] https://gist.github.com/asardaes/b804b7ed04ace176881189c3d1cf842a

Regards,
Alexis.


Re: Partial broadcast/keyed connected streams

2022-10-11 Thread Alexis Sarda-Espinosa
Oh wow, I had read that documentation so many times and I was sure that API
also expected the broadcasted side to have a key like the other side, but
that's not the case, that is already what I was thinking of. Thanks.

Regards,
Alexis.

On Wed, 12 Oct 2022, 03:42 仙路尽头谁为峰,  wrote:

> Hi Alexis:
>
>The broadcast state pattern should be done by calling connect() on
> the non-broadcasted stream, with the *broadcaststream* as an argument.
>
>And if the main stream is keyedStream, then the processElement
> function will have access to any keyed state as normal keyedstream.
>
>
>
> Best Regards!
>
> 从 Windows 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
>
>
> *发件人: *Alexis Sarda-Espinosa 
> *发送时间: *2022年10月12日 4:11
> *收件人: *user 
> *主题: *Partial broadcast/keyed connected streams
>
>
>
> Hi everyone,
>
>
>
> I am currently thinking about a use case for a streaming job and, while
> I'm fairly certain it cannot be done with the APIs that Flink currently
> provides, I figured I'd put it out there in case other users think
> something like this would be useful to a wider audience.
>
>
>
> The current broadcasting mechanisms offered by Flink mention use cases
> where "control events" are needed. In my case I would also have control
> events, and I would need to broadcast them to *all parallel instances* of
> any downstream operators that consume the events. However, some of those
> operators have to be keyed because they are stateful. From the API's point
> of view, I'd imagine something like
>
>
>
>
> controlStream.connect(mainStream).broadcastFirstKeySecondBy(keySelector).process(PartiallyKeyedCoProcessFunction)
>
>
>
> The function would also have something like processElement1 and
> processElement2, but one of those methods wouldn't have access to
> partitioned state (or could it have access to state for all key groups
> handled by that instance?).
>
>
>
> Since I'm not familiar with all of Flink's internals, I don't know if this
> would be even remotely feasible, but I'd like to know if others have
> opinions on this.
>
>
>
> Regards,
>
> Alexis.
>
>
>
>
>


Partial broadcast/keyed connected streams

2022-10-11 Thread Alexis Sarda-Espinosa
Hi everyone,

I am currently thinking about a use case for a streaming job and, while I'm
fairly certain it cannot be done with the APIs that Flink currently
provides, I figured I'd put it out there in case other users think
something like this would be useful to a wider audience.

The current broadcasting mechanisms offered by Flink mention use cases
where "control events" are needed. In my case I would also have control
events, and I would need to broadcast them to *all parallel instances* of
any downstream operators that consume the events. However, some of those
operators have to be keyed because they are stateful. From the API's point
of view, I'd imagine something like

controlStream.connect(mainStream).broadcastFirstKeySecondBy(keySelector).process(PartiallyKeyedCoProcessFunction)

The function would also have something like processElement1 and
processElement2, but one of those methods wouldn't have access to
partitioned state (or could it have access to state for all key groups
handled by that instance?).

Since I'm not familiar with all of Flink's internals, I don't know if this
would be even remotely feasible, but I'd like to know if others have
opinions on this.

Regards,
Alexis.


Re: Window state size with global window and custom trigger

2022-10-10 Thread Alexis Sarda-Espinosa
Thanks for the confirmation :)

Regards,
Alexis.

On Sun, 9 Oct 2022, 10:37 Hangxiang Yu,  wrote:

> Hi, Alexis.
> I think you are right. It also applies for a global window with a custom
> trigger.
> If you apply a ReduceFunction or AggregateFunction, the window state size
> usually is smaller than applying ProcessWindowFunction due to the
> aggregated value. It also works for global windows.
> Of course, the state size of a global window also depends on how you
> implement your trigger.
> BTW, we often use TTL to reduce the state size of the global window.
> Hope these can help you.
>
>
> On Sat, Oct 8, 2022 at 4:49 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I found an SO thread that clarifies some details of window state size
>> [1]. I would just like to confirm that this also applies when using a
>> global window with a custom trigger.
>>
>> The reason I ask is that the TriggerResult API is meant to cover all
>> supported scenarios, so FIRE vs FIRE_AND_PURGE is relevant, for example,
>> for a ProcessWindowFunction that holds all input records until it fires.
>> However, I assume there would be no distinction if I use a
>> (Rich)AggregateFunction, regardless of window type (global vs timed), but
>> I'd like to be sure.
>>
>> Regards,
>> Alexis.
>>
>> [1]
>> https://stackoverflow.com/questions/55247668/flink-window-state-size-and-state-management
>>
>>
>
> --
> Best,
> Hangxiang.
>


Window state size with global window and custom trigger

2022-10-07 Thread Alexis Sarda-Espinosa
Hello,

I found an SO thread that clarifies some details of window state size [1].
I would just like to confirm that this also applies when using a global
window with a custom trigger.

The reason I ask is that the TriggerResult API is meant to cover all
supported scenarios, so FIRE vs FIRE_AND_PURGE is relevant, for example,
for a ProcessWindowFunction that holds all input records until it fires.
However, I assume there would be no distinction if I use a
(Rich)AggregateFunction, regardless of window type (global vs timed), but
I'd like to be sure.

Regards,
Alexis.

[1]
https://stackoverflow.com/questions/55247668/flink-window-state-size-and-state-management


Serialization in window contents and network buffers

2022-09-27 Thread Alexis Sarda-Espinosa
Hi everyone,

I know the low level details of this are likely internal, but at a high
level we can say that operators usually have some state associated with
them. Particularly for error handling and job restarts, I imagine windows
must persist state, and operators in general probably persist network
buffers for anything that hasn't been processed or emitted.

Does Flink's serialization stack with type information applies to these
cases?

I ask specifically due to a use case I'm evaluating. I would have an
interface as output of some operators, and all those outputs would go into
a windowing operator. However, each non-windowing operator would emit a
different concrete class that implements the interface.

If I add @TypeInfo annotations to the concrete implementers, would Flink
find them and use them even if the operators are defined in terms of the
interface?

Regards,
Alexis.


Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Alexis Sarda-Espinosa
I would suggest updating the documentation to include that statement.

I imagine dynamic partition discovery has no effect on this?

Regards,
Alexis.

Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:

> Flink only reads the offsets from Kafka when the job is initially started
> from a clear slate.
> Once checkpoints are involved it only relies on offsets stored in the
> state.
>
> On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:
>
> Hello again,
>
> I just performed a test
> using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I
> did a few tests in the following order, and I noticed a few weird things.
> Note that our job uses Processing Time windows, so watermarks are
> irrelevant.
>
> 1. After the job had been running for a while, we manually moved the
> consumer group's offset to 12 hours in the past [1] (without restarting the
> job).
>   - After this, the consumer simply stopped reading messages - the
> consumer lag in Kafka stayed at around 150k (no new data arrived)
>
> 2. We restarted the job with a checkpoint.
>   - The consumer lag in Kafka dropped down to 0, but no data was
> emitted from the windows.
>
> 3. We stopped the job, moved the offset again, and restarted Without any
> checkpoint/savepoint.
>   - This time the consumer correctly processed the backlog and emitted
> events from the windows.
>
> This was done with Flink 1.15.0.
>
> Is this expected? In other words, if there's a mismatch between Flink's
> state's offset and Kafka's offset, will the job be unable to run?
>
>
>
> [1] The command to move the offset was:
>
> kafka-consumer-groups.sh \
>   --bootstrap-server ... \
>   --topic our-topic \
>   --group our-group \
>   --command-config kafka-preprod.properties \
>   --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
>   --execute
>
> Regards,
> Alexis.
>
> Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hi Yaroslav,
>>
>> The test I did was just using earliest, I'll test with committed offset
>> again, thanks.
>>
>> Regards,
>> Alexis.
>>
>> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, 
>> wrote:
>>
>>> Hi Alexis,
>>>
>>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
>>> consumer offsets? In this case, it should get the offsets from Kafka and
>>> not the state.
>>>
>>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Regarding the new Kafka source (configure with a consumer group), I
>>>> found out that if I manually change the group's offset with Kafka's admin
>>>> API independently of Flink (while the job is running), the Flink source
>>>> will ignore that and reset it to whatever it stored internally. Is there
>>>> any way to prevent this?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>


Re: Making Kafka source respect offset changed externally

2022-07-20 Thread Alexis Sarda-Espinosa
Hello again,

I just performed a test
using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I
did a few tests in the following order, and I noticed a few weird things.
Note that our job uses Processing Time windows, so watermarks are
irrelevant.

1. After the job had been running for a while, we manually moved the
consumer group's offset to 12 hours in the past [1] (without restarting the
job).
  - After this, the consumer simply stopped reading messages - the consumer
lag in Kafka stayed at around 150k (no new data arrived)

2. We restarted the job with a checkpoint.
  - The consumer lag in Kafka dropped down to 0, but no data was
emitted from the windows.

3. We stopped the job, moved the offset again, and restarted Without any
checkpoint/savepoint.
  - This time the consumer correctly processed the backlog and emitted
events from the windows.

This was done with Flink 1.15.0.

Is this expected? In other words, if there's a mismatch between Flink's
state's offset and Kafka's offset, will the job be unable to run?



[1] The command to move the offset was:

kafka-consumer-groups.sh \
  --bootstrap-server ... \
  --topic our-topic \
  --group our-group \
  --command-config kafka-preprod.properties \
  --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
  --execute

Regards,
Alexis.

Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Yaroslav,
>
> The test I did was just using earliest, I'll test with committed offset
> again, thanks.
>
> Regards,
> Alexis.
>
> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, 
> wrote:
>
>> Hi Alexis,
>>
>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
>> consumer offsets? In this case, it should get the offsets from Kafka and
>> not the state.
>>
>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Regarding the new Kafka source (configure with a consumer group), I
>>> found out that if I manually change the group's offset with Kafka's admin
>>> API independently of Flink (while the job is running), the Flink source
>>> will ignore that and reset it to whatever it stored internally. Is there
>>> any way to prevent this?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Did the semantics of Kafka's earliest offset change with the new source API?

2022-07-18 Thread Alexis Sarda-Espinosa
Hi David,

thanks for the info, indeed I had misunderstood our old configuration, we
didn't use earliest before, we just used the default.

Regards,
Alexis.

Am Fr., 15. Juli 2022 um 14:46 Uhr schrieb David Anderson <
dander...@apache.org>:

> What did change was the default starting position when not starting from a
> checkpoint. With FlinkKafkaConsumer, it starts from the committed offsets
> by default. With KafkaSource, it starts from the earliest offset.
>
> David
>
> On Fri, Jul 15, 2022 at 5:57 AM Chesnay Schepler 
> wrote:
>
>> I'm not sure about the previous behavior, but at the very least according
>> to the documentation the behavior is identical.
>>
>> 1.12:
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>
>> *setStartFromEarliest()** / **setStartFromLatest()**: Start from the
>> earliest / latest record. Under these modes, committed offsets in Kafka
>> will be ignored and not used as starting positions.*
>>
>> On 13/07/2022 18:53, Alexis Sarda-Espinosa wrote:
>>
>> Hello,
>>
>> I have a job running with Flink 1.15.0 that consumes from Kafka with the
>> new KafkaSource API, setting a group ID explicitly and specifying
>> OffsetsInitializer.earliest() as a starting offset. Today I restarted the
>> job ignoring both savepoint and checkpoint, and the consumer started
>> reading from the first available message in the broker (from 24 hours ago),
>> i.e. it completely ignored the offsets that were committed to Kafka. If I
>> use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
>> instead, the problem seems to go away.
>>
>> With the previous FlinkKafkaConsumer, using earliest didn't cause any
>> such issues. Was this changed in the aforementioned way on purpose?
>>
>> Regards,
>> Alexis.
>>
>>
>>


Re: Making Kafka source respect offset changed externally

2022-07-14 Thread Alexis Sarda-Espinosa
Hi Yaroslav,

The test I did was just using earliest, I'll test with committed offset
again, thanks.

Regards,
Alexis.

On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,  wrote:

> Hi Alexis,
>
> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
> consumer offsets? In this case, it should get the offsets from Kafka and
> not the state.
>
> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Regarding the new Kafka source (configure with a consumer group), I found
>> out that if I manually change the group's offset with Kafka's admin API
>> independently of Flink (while the job is running), the Flink source will
>> ignore that and reset it to whatever it stored internally. Is there any way
>> to prevent this?
>>
>> Regards,
>> Alexis.
>>
>>


Making Kafka source respect offset changed externally

2022-07-14 Thread Alexis Sarda-Espinosa
Hello,

Regarding the new Kafka source (configure with a consumer group), I found
out that if I manually change the group's offset with Kafka's admin API
independently of Flink (while the job is running), the Flink source will
ignore that and reset it to whatever it stored internally. Is there any way
to prevent this?

Regards,
Alexis.


Did the semantics of Kafka's earliest offset change with the new source API?

2022-07-13 Thread Alexis Sarda-Espinosa
Hello,

I have a job running with Flink 1.15.0 that consumes from Kafka with the
new KafkaSource API, setting a group ID explicitly and specifying
OffsetsInitializer.earliest() as a starting offset. Today I restarted the
job ignoring both savepoint and checkpoint, and the consumer started
reading from the first available message in the broker (from 24 hours ago),
i.e. it completely ignored the offsets that were committed to Kafka. If I
use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
instead, the problem seems to go away.

With the previous FlinkKafkaConsumer, using earliest didn't cause any such
issues. Was this changed in the aforementioned way on purpose?

Regards,
Alexis.


RE: Schema Evolution of POJOs fails on Field Removal

2022-05-18 Thread Alexis Sarda-Espinosa
Hi David,

Please refer to https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Alexis.

-Original Message-
From: David Jost  
Sent: Mittwoch, 18. Mai 2022 15:07
To: user@flink.apache.org
Subject: Schema Evolution of POJOs fails on Field Removal

Hi,

we currently have an issue, where our job fails to restart from a savepoint, 
after we removed a field from a serialised (POJO) class. According to [0], this 
kind of evolution is supported, but it sadly only works when adding, but not 
removing fields.

I was hoping, someone here might be able to help or have a pointer on where to 
continue the search for a solution.

Thank you in advance.

Best
  David


RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-05-02 Thread Alexis Sarda-Espinosa
Ok

Regards,
Alexis.

From: Peter Brucia 
Sent: Freitag, 22. April 2022 15:31
To: Alexis Sarda-Espinosa 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

No
Sent from my iPhone



Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-22 Thread Alexis Sarda-Espinosa
Hi David,

I don't find it troublesome per se, I was rather trying to understand what 
should be expected (and documented) for my application. Before I restarted the 
job and changed some configurations, it ran for around 10 days and ended up 
with a state size of about 1.8GB, so I'm still not sure what is the upper bound 
in my scenario, or if that amount of "uncompacted garbage" is normal or not 
(for our throughput). This is important for us because we need to know how to 
size (disk space) the infrastructure, although it is also having a big impact 
on timings because each checkpoint ends up requiring 30+ seconds to complete, 
and they will eventually time out for sure.

I understand RocksDB has different sophisticated mechanisms, so I certainly 
don't expect one magic button that does exactly what I want, but ideally there 
would be a way to tune configuration in a way that a rough upper bound estimate 
of disk space can be deduced. Having some expired state for a while is 
expected, what I find odd is that it grows so fast, the size of the state 
quickly outpaces the size of processed events, even though the state only 
persists a subset of information (some integer ids, string ids, longs for 
epochs).

At this point I think I can conclude that the "live" state from my operators is 
not growing indefinitely (based on what I see with the state processor API), so 
is there a way to get a better estimate of disk utilization other than letting 
the job run and wait? I've been reading through RocksDB documentation as well, 
but that might not be enough because I don't know how Flink handles its own 
framework state internally.

Regards,
Alexis.


From: David Anderson 
Sent: Friday, April 22, 2022 9:57 AM
To: Alexis Sarda-Espinosa 
Cc: ro...@apache.org ; user@flink.apache.org 

Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Alexis,

Compaction isn't an all-at-once procedure. RocksDB is organized as a series of 
levels, each 10x larger than the one below. There are a few different 
compaction algorithms available, and they are tunable, but what's typically 
happening during compaction is that one SST file at level n is being merged 
into the relevant SST files at level n+1. During this compaction procedure, 
obsolete and deleted entries are cleaned up. And several such compactions can 
be occurring concurrently. (Not to mention that each TM has its own independent 
RocksDB instance.)

It's not unusual for jobs with a small amount of state to end up with 
checkpoints of a few hundred MBs in size, where a lot of that is uncompacted 
garbage. If you find this troublesome, you could configure RocksDB to compact 
more frequently.

David

On Thu, Apr 21, 2022 at 12:49 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I enabled some of the RocksDB metrics and I noticed some additional things. 
After changing the configuration YAML, I restarted the cluster with a 
savepoint, and I can see that it only used 5.6MB on disk. Consequently, after 
the job switched to running state, the new checkpoints were also a few MB in 
size. After running for 1 day, checkpoint size is now around 100MB. From the 
metrics I can see with the Prometheus reporter:

- All entries for num-live-versions show 1
- All entries for compaction-pending show 0
- Most entries for estimate-num-keys are in the range of 0 to 100, although I 
see a few with 151 coming from 
flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys

Is compaction expected after only 100MB? I imagine not, but if the savepoint 
shows that the effective amount of data is so low, size growth still seems far 
too large. In fact, if I only look at the UI, Bytes Received for the relevant 
SubTasks is about 14MB, yet the latest checkpoint already shows a Data Size of 
75MB for said SubTasks.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan mailto:ro...@apache.org>>
Sent: Mittwoch, 20. April 2022 10:37
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

State Processor API works on a higher level and is not aware of any RocksDB 
specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point 
> anyway. However, going back to the original question, is there no way to gain 
> more insight into this with the state processor API? You've mentioned 
> potential issues (too many states, missing compaction) but, with my 
> admittedly limited understanding of the way RocksDB is used in Flink, 

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-21 Thread Alexis Sarda-Espinosa
Hello,

I enabled some of the RocksDB metrics and I noticed some additional things. 
After changing the configuration YAML, I restarted the cluster with a 
savepoint, and I can see that it only used 5.6MB on disk. Consequently, after 
the job switched to running state, the new checkpoints were also a few MB in 
size. After running for 1 day, checkpoint size is now around 100MB. From the 
metrics I can see with the Prometheus reporter:

- All entries for num-live-versions show 1
- All entries for compaction-pending show 0
- Most entries for estimate-num-keys are in the range of 0 to 100, although I 
see a few with 151 coming from 
flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys

Is compaction expected after only 100MB? I imagine not, but if the savepoint 
shows that the effective amount of data is so low, size growth still seems far 
too large. In fact, if I only look at the UI, Bytes Received for the relevant 
SubTasks is about 14MB, yet the latest checkpoint already shows a Data Size of 
75MB for said SubTasks.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Mittwoch, 20. April 2022 10:37
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

State Processor API works on a higher level and is not aware of any RocksDB 
specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

 wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point 
> anyway. However, going back to the original question, is there no way to gain 
> more insight into this with the state processor API? You've mentioned 
> potential issues (too many states, missing compaction) but, with my 
> admittedly limited understanding of the way RocksDB is used in Flink, I would 
> have thought that such things would be visible when using the state 
> processor. Is there no way for me to "parse" those MANIFEST files with some 
> of Flink's classes and get some more hints?
>
> Regards,
> Alexis.
>
> 
> From: Roman Khachatryan 
> Sent: Tuesday, April 19, 2022 5:51 PM
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org 
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> > I assume that when you say "new states", that is related to new descriptors 
> > with different names? Because, in the case of windowing for example, each 
> > window "instance" has its own scoped (non-global and keyed) state, but 
> > that's not regarded as a separate column family, is it?
> Yes, that's what I meant, and that's regarded as the same column family.
>
> Another possible reason is that SST files aren't being compacted and 
> that increases the MANIFEST file size.
> I'd check the total number of loaded SST files and the creation date 
> of the oldest one.
>
> You can also see whether there are any compactions running via RocksDB 
> metrics [1] [2] (a reporter needs to be configured [3]).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-num-running-compactions
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-compaction-pending
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/m
> etric_reporters/#reporters
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa 
>  wrote:
> >
> > Hi Roman,
> >
> > I assume that when you say "new states", that is related to new descriptors 
> > with different names? Because, in the case of windowing for example, each 
> > window "instance" has its own scoped (non-global and keyed) state, but 
> > that's not regarded as a separate column family, is it?
> >
> > For the 3 descriptors I mentioned before, they are only instantiated once 
> > and used like this:
> >
> > - Window list state: each call to process() executes 
> > context.windowState().getListState(...).get()
> > - Global map state: each call to process() executes 
> > context.globalState().getMapState(...)
> > - Global list state: within open(), runtimeContext.getListState(...) is 
> > executed once and used throughout the life of the operator.
> >
> > According to [1], the two ways of using global state should be equivalent.
> >
> > I will say that some of the operators instantiate the state descriptor in 
> > their constructors, i.e. before they are serialized to the TM, but the 
> > descriptors are Serializabl

Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Alexis Sarda-Espinosa
I can look into RocksDB metrics, I need to configure Prometheus at some point 
anyway. However, going back to the original question, is there no way to gain 
more insight into this with the state processor API? You've mentioned potential 
issues (too many states, missing compaction) but, with my admittedly limited 
understanding of the way RocksDB is used in Flink, I would have thought that 
such things would be visible when using the state processor. Is there no way 
for me to "parse" those MANIFEST files with some of Flink's classes and get 
some more hints?

Regards,
Alexis.


From: Roman Khachatryan 
Sent: Tuesday, April 19, 2022 5:51 PM
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
Yes, that's what I meant, and that's regarded as the same column family.

Another possible reason is that SST files aren't being compacted and
that increases the MANIFEST file size.
I'd check the total number of loaded SST files and the creation date
of the oldest one.

You can also see whether there are any compactions running via RocksDB
metrics [1] [2] (a reporter needs to be configured [3]).

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters

Regards,
Roman

On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
 wrote:
>
> Hi Roman,
>
> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
>
> For the 3 descriptors I mentioned before, they are only instantiated once and 
> used like this:
>
> - Window list state: each call to process() executes 
> context.windowState().getListState(...).get()
> - Global map state: each call to process() executes 
> context.globalState().getMapState(...)
> - Global list state: within open(), runtimeContext.getListState(...) is 
> executed once and used throughout the life of the operator.
>
> According to [1], the two ways of using global state should be equivalent.
>
> I will say that some of the operators instantiate the state descriptor in 
> their constructors, i.e. before they are serialized to the TM, but the 
> descriptors are Serializable, so I imagine that's not relevant.
>
> [1] https://stackoverflow.com/a/50510054/5793905
>
> Regards,
> Alexis.
>
> -Original Message-
> From: Roman Khachatryan 
> Sent: Dienstag, 19. April 2022 11:48
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state 
> processor API
>
> Hi Alexis,
>
> Thanks a lot for the information,
>
> MANIFEST files list RocksDB column families (among other info); ever growing 
> size of these files might indicate that some new states are constantly being 
> created.
> Could you please confirm that the number of state names is constant?
>
> > Could you confirm if Flink's own operators could be creating state in 
> > RocksDB? I assume the window operators save some information in the state 
> > as well.
> That's correct, window operators maintain a list of elements per window and a 
> set of timers (timestamps). These states' names should be fixed (something 
> like "window-contents" and "window-timers").
>
> > is that related to managed state used by my functions? Or does that 
> > indicate size growth is elsewhere?
> The same mechanism is used for both Flink internal state and operator state, 
> so it's hard to say without at least knowing the state names.
>
>
> Regards,
> Roman
>
>
> On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
> >
> > /shared folder contains keyed state that is shared among different
> > checkpoints [1]. Most of state should be shared in your case since
> > you're using keyed state and incremental checkpoints.
> >
> > When a checkpoint is loaded, the state that it shares with older
> > checkpoints is loaded as well. I suggest

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Alexis Sarda-Espinosa
Hi Roman,

I assume that when you say "new states", that is related to new descriptors 
with different names? Because, in the case of windowing for example, each 
window "instance" has its own scoped (non-global and keyed) state, but that's 
not regarded as a separate column family, is it?

For the 3 descriptors I mentioned before, they are only instantiated once and 
used like this:

- Window list state: each call to process() executes 
context.windowState().getListState(...).get()
- Global map state: each call to process() executes 
context.globalState().getMapState(...)
- Global list state: within open(), runtimeContext.getListState(...) is 
executed once and used throughout the life of the operator.

According to [1], the two ways of using global state should be equivalent.

I will say that some of the operators instantiate the state descriptor in their 
constructors, i.e. before they are serialized to the TM, but the descriptors 
are Serializable, so I imagine that's not relevant.

[1] https://stackoverflow.com/a/50510054/5793905

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Dienstag, 19. April 2022 11:48
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

Thanks a lot for the information,

MANIFEST files list RocksDB column families (among other info); ever growing 
size of these files might indicate that some new states are constantly being 
created.
Could you please confirm that the number of state names is constant?

> Could you confirm if Flink's own operators could be creating state in 
> RocksDB? I assume the window operators save some information in the state as 
> well.
That's correct, window operators maintain a list of elements per window and a 
set of timers (timestamps). These states' names should be fixed (something like 
"window-contents" and "window-timers").

> is that related to managed state used by my functions? Or does that indicate 
> size growth is elsewhere?
The same mechanism is used for both Flink internal state and operator state, so 
it's hard to say without at least knowing the state names.


Regards,
Roman


On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
>
> /shared folder contains keyed state that is shared among different 
> checkpoints [1]. Most of state should be shared in your case since 
> you're using keyed state and incremental checkpoints.
>
> When a checkpoint is loaded, the state that it shares with older 
> checkpoints is loaded as well. I suggested to load different 
> checkpoints (i.e. chk-* folders) and compare the numbers of objects in 
> their states. To prevent the job from discarding the state, it can 
> either be stopped for some time and then restarted from the latest 
> checkpoint; or the number of retained checkpoints can be increased 
> [2]. Copying isn't necessary.
>
> Besides that, you can also check state sizes of operator in Flink Web 
> UI (but not the sizes of individual states). If the operators are 
> chained then their combined state size will be shown. To prevent this, 
> you can disable chaining [3] (although this will have performance 
> impact).
>
> Individual checkpoint folders should be eventually removed (when the 
> checkpoint is subsumed). However, this is not guaranteed: if there is 
> any problem during deletion, it will be logged, but the job will not 
> fail.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> eckpoints/#directory-structure
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-checkpoints-num-retained
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre
> am/operators/overview/#disable-chaining
>
> Regards,
> Roman
>
> On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
>  wrote:
> >
> > Hi Roman,
> >
> > Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> > You suggest comparing counts of objects in different checkpoints, I assume 
> > you mean copying my "checkpoints" folder at different times and comparing, 
> > not comparing different "chk-*" folders in the same snapshot, right?
> >
> > I haven't executed the processor program with a newer checkpoint, but I did 
> > look at the folder in the running system, and I noticed that most of the 
> > chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> > corresponding to newer checkpoints. I would think this makes sense since 
> > the configuration specifies that only 1 completed checkpoint should be 
> > retained, but then why are the older chk-* folders still there? I did 
> > trigger a manual r

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-14 Thread Alexis Sarda-Espinosa
Hello,

There was a network issue in my environment and the job had to restart. After 
the job came back up, the logs showed a lot of lines like this:

RocksDBIncrementalRestoreOperation ... Starting to restore from state handle: 
...

Interestingly, those entries include information about sizes in bytes:

- 
445163.sst=ByteStreamStateHandle{handleName='file:/opt/flink/state/checkpoints//shared/18f95afa-dc66-467d-bd05-779895f24960',
 dataBytes=1328}
- privateState={MANIFEST-04=File State: 
file:/opt/flink/state/checkpoints//shared/bd7fde24-3ef6-4e05-bbd6-1474f8051d5d
 [80921331 bytes]

I extracted a lot of that information and I can see that:

- If I sum all dataBytes from sharedState, that only accounts for a couple MB.
- Most of the state comes from privateState, specifically from the entries 
referring to MANIFEST File State; that accounts for almost 1.5GB.

I believe that is one of the files RocksDB uses internally, but is that related 
to managed state used by my functions? Or does that indicate size growth is 
elsewhere?

Regards,
Alexis.

-Original Message-
From: Alexis Sarda-Espinosa  
Sent: Dienstag, 12. April 2022 15:39
To: ro...@apache.org
Cc: user@flink.apache.org
Subject: RE: RocksDB's state size discrepancy with what's seen with state 
processor API

Thanks for all the pointers. The UI does show combined state for a chain, but 
the only state descriptors inside that chain are the 3 I mentioned before. Its 
size is still increasing today, and duration is now around 30 seconds (I can't 
use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but 
I don't see anything weird. The counts go up and down depending on which one I 
load, but even the bigger ones have around 500-700 entries, which should only 
be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
windowedStream  = timestampedStream -> reinterpretAsKeyedStream -> 
window (SlidingEventTimeWindows)
windowedStream -> process1 -> sink1
windowedStream -> process2 -> sink2
windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing 
correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? 
I assume the window operators save some information in the state as well.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan 
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

/shared folder contains keyed state that is shared among different checkpoints 
[1]. Most of state should be shared in your case since you're using keyed state 
and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is 
loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) 
and compare the numbers of objects in their states. To prevent the job from 
discarding the state, it can either be stopped for some time and then restarted 
from the latest checkpoint; or the number of retained checkpoints can be 
increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but 
not the sizes of individual states). If the operators are chained then their 
combined state size will be shown. To prevent this, you can disable chaining 
[3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint 
is subsumed). However, this is not guaranteed: if there is any problem during 
deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
 wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> You suggest comparing counts of objects in different checkpoints, I assume 
> you mean copying my "checkpoints" folder at different times and comparing, 
> not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did 
> look at the folder in the running system, and I noticed that most of the 
> chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> corresp

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-12 Thread Alexis Sarda-Espinosa
Thanks for all the pointers. The UI does show combined state for a chain, but 
the only state descriptors inside that chain are the 3 I mentioned before. Its 
size is still increasing today, and duration is now around 30 seconds (I can't 
use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but 
I don't see anything weird. The counts go up and down depending on which one I 
load, but even the bigger ones have around 500-700 entries, which should only 
be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
windowedStream  = timestampedStream -> reinterpretAsKeyedStream -> 
window (SlidingEventTimeWindows)
windowedStream -> process1 -> sink1
windowedStream -> process2 -> sink2
windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing 
correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? 
I assume the window operators save some information in the state as well.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

/shared folder contains keyed state that is shared among different checkpoints 
[1]. Most of state should be shared in your case since you're using keyed state 
and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is 
loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) 
and compare the numbers of objects in their states. To prevent the job from 
discarding the state, it can either be stopped for some time and then restarted 
from the latest checkpoint; or the number of retained checkpoints can be 
increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but 
not the sizes of individual states). If the operators are chained then their 
combined state size will be shown. To prevent this, you can disable chaining 
[3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint 
is subsumed). However, this is not guaranteed: if there is any problem during 
deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
 wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> You suggest comparing counts of objects in different checkpoints, I assume 
> you mean copying my "checkpoints" folder at different times and comparing, 
> not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did 
> look at the folder in the running system, and I noticed that most of the 
> chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> corresponding to newer checkpoints. I would think this makes sense since the 
> configuration specifies that only 1 completed checkpoint should be retained, 
> but then why are the older chk-* folders still there? I did trigger a manual 
> restart of the Flink cluster in the past (before starting the long-running 
> test), but if my policy is to CLAIM the checkpoint, Flink's documentation 
> states that it would be cleaned eventually.
>
> Moreover, just by looking at folder sizes with "du", I can see that most of 
> the state is held in the "shared" folder, and that has grown for sure; I'm 
> not sure what "shared" usually holds, but if that's what's growing, maybe I 
> can rule out expired state staying around?. My pipeline doesn't use timers, 
> although I guess Flink itself may use them. Is there any way I could get some 
> insight into which operator holds larger states?
>
> Regards,
> Alexis.
>
> -Original Message-
> From: Roman Khachatryan 
> Sent: Dienstag, 12. April 2022 12:37
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> Hi Alexis,
>
> Thanks a lot for sharing this. I think the program is correct.
> Although it doesn'

  1   2   >