Re: JDBCInputFormat and SplitDataProperties

2018-08-09 Thread Alexis Sarda
Hi Fabian,

Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
declares javaSet as private[flink], so I cannot access it directly.
Nevertheless, I managed to get around it by using the java environment:

val env = org.apache.flink.api.java.ExecutionEnvironment.
getExecutionEnvironment

val inputFormat = getInputFormat(query, dbUrl, properties)
val outputFormat = getOutputFormat(dbUrl, properties)

val source = env.createInput(inputFormat)
val sdp = source.getSplitDataProperties
sdp.splitsPartitionedBy(0)
sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))

// transform java DataSet to scala DataSet...
new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
  .groupBy(0, 1)
  .combineGroup(groupCombiner)
  .withForwardedFields("f0->_1")
  .groupBy(0, 1)
  .reduceGroup(groupReducer)
  .withForwardedFields("_1")
  .output(outputFormat)

It seems to work well, and the semantic annotation does remove a hash
partition from the execution plan.

Regards,
Alexis.


On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske  wrote:

> Hi Alexis,
>
> The Scala API does not expose a DataSource object but only a Scala DataSet
> which wraps the Java object.
> You can get the SplitDataProperties from the Scala DataSet as follows:
>
> val dbData: DataSet[...] = ???
> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>
> So you first have to get the wrapped Java DataSet, cast it to DataSource
> and then get the properties.
> It's not very nice, but should work.
>
> In order to use SDPs, you should be a bit familiar how physical data
> properties are propagated and discarded in the optimizer.
> For example, applying a simple MapFunction removes all properties because
> the function might have changed the fields on which a DataSet is
> partitioned or sorted.
> You can expose the behavior of a function to the optimizer by using
> Semantic Annotations [1]
>
> Some comments on the code and plan you shared:
> - You might want to add hostname to ORDER BY to have the output grouped by
> (ts, hostname).
> - Check the Global and Local data properties in the plan to validate that
> the SDP were correctly interpreted.
> - If the data is already correctly partitioned and sorted, you might not
> need the Combiners. In either case, you properly want to annotate them with
> Forward Field annoations.
>
> The number of source tasks is unrelated to the number of splits. If you
> have more tasks than splits, some tasks won't process any data.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>
>
> 2018-08-08 14:10 GMT+02:00 Alexis Sarda :
>
>> Hi Fabian,
>>
>> Thanks for the clarification. I have a few remarks, but let me provide
>> more concrete information. You can find the query I'm using, the
>> JDBCInputFormat creation, and the execution plan in this github gist:
>>
>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>
>> I cannot call getSplitDataProperties because env.createInput(inputFormat)
>> returns a DataSet, not a DataSource. In the code, I do this instead:
>>
>> val javaEnv =
>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>> "example")
>>
>> which feels wrong (the constructor doesn't accept a Scala environment).
>> Is there a better alternative?
>>
>> I see absolutely no difference in the execution plan whether I use SDP or
>> not, so therefore the results are indeed the same. Is this expected?
>>
>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>> that the constructor for GenericInputSplit takes two parameters:
>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>> 2 splits divided into 24 partitions?
>>
>> Regards,
>> Alexis.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske  wrote:
>>
>>> Hi Alexis,
>>>
>>> First of all, I think you leverage the partitioning and sorting
>>> properties of the data returned by the database using SplitDataProperties.
>>> However, please be aware that SplitDataProperties are a rather
>>> experimental feature.
>>>
>>> If used without query parameters, the JDBCInputFormat generates a single
>>> split and queries the database just once. If you want to leverage
>>> parallelism, you have to specify a query with parameters in the WHERE
>>> clause to read different parts of the 

JDBCInputFormat and SplitDataProperties

2018-08-07 Thread Alexis Sarda
Hi everyone,

I have the following scenario: I have a database table with 3 columns: a
host (string), a timestamp, and an integer ID. Conceptually, what I'd like
to do is:

group by host and timestamp -> based on all the IDs in each group, create a
mapping to n new tuples -> for each unique tuple, count how many times it
appeared across the resulting data

Each new tuple has 3 fields: the host, a new ID, and an Integer=1

What I'm currently doing is roughly:

val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
val source = environment.createInput(inut)
source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
1).aggregate(SUM, 2)

The query given to JDBCInputFormat provides results ordered by host and
timestamp, and I was wondering if performance can be improved by specifying
this in the code. I've looked at
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
and
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
but I still have some questions:

- If a split is a subset of a partition, what is the meaning of
SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
split is divided into partitions, meaning that a partition would be a
subset of a split.
- At which point can I retrieve and adjust a SplitDataProperties instance,
if possible at all?
- If I wanted a coarser parallelization where each slot gets all the data
for the same host, would I have to manually create the sub-groups based on
timestamp?

Regards,
Alexis.


Re: JDBCInputFormat and SplitDataProperties

2018-08-08 Thread Alexis Sarda
Hi Fabian,

Thanks for the clarification. I have a few remarks, but let me provide more
concrete information. You can find the query I'm using, the JDBCInputFormat
creation, and the execution plan in this github gist:

https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d

I cannot call getSplitDataProperties because env.createInput(inputFormat)
returns a DataSet, not a DataSource. In the code, I do this instead:

val javaEnv =
org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
"example")

which feels wrong (the constructor doesn't accept a Scala environment). Is
there a better alternative?

I see absolutely no difference in the execution plan whether I use SDP or
not, so therefore the results are indeed the same. Is this expected?

My ParameterValuesProvider specifies 2 splits, yet the execution plan shows
Parallelism=24. Even the source code is a bit ambiguous, considering that
the constructor for GenericInputSplit takes two parameters: partitionNumber
and totalNumberOfPartitions. Should I assume that there are 2 splits
divided into 24 partitions?

Regards,
Alexis.



On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske  wrote:

> Hi Alexis,
>
> First of all, I think you leverage the partitioning and sorting properties
> of the data returned by the database using SplitDataProperties.
> However, please be aware that SplitDataProperties are a rather
> experimental feature.
>
> If used without query parameters, the JDBCInputFormat generates a single
> split and queries the database just once. If you want to leverage
> parallelism, you have to specify a query with parameters in the WHERE
> clause to read different parts of the table.
> Note, depending on the configuration of the database, multiple queries
> result in multiple full scans. Hence, it might make sense to have an index
> on the partitioning columns.
>
> If properly configured, the JDBCInputFormat generates multiple splits
> which are partitioned. Since the partitioning is encoded in the query, it
> is opaque to Flink and must be explicitly declared.
> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
> Flink that all records with the same value in the partitioning field are
> read from the same split, i.e, the full data is partitioned on the
> attribute across splits.
> The same can be done for ordering if the queries of the JDBCInputFormat is
> specified with an ORDER BY clause.
> Partitioning and grouping are two different things. You can define a query
> that partitions on hostname and orders by hostname and timestamp and
> declare these properties in the SDP.
>
> You can get a SDP object by calling DataSource.getSplitDataProperties().
> In your example this would be source.getSplitDataProperties().
>
> Whatever you do, you should carefully check the execution plan
> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
> validate that the result are identical whether you use SDP or not.
>
> Best, Fabian
>
> [1] https://flink.apache.org/visualizer/
>
> 2018-08-07 22:32 GMT+02:00 Alexis Sarda :
>
>> Hi everyone,
>>
>> I have the following scenario: I have a database table with 3 columns: a
>> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
>> to do is:
>>
>> group by host and timestamp -> based on all the IDs in each group, create
>> a mapping to n new tuples -> for each unique tuple, count how many times it
>> appeared across the resulting data
>>
>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>
>> What I'm currently doing is roughly:
>>
>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>> val source = environment.createInput(inut)
>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>> 1).aggregate(SUM, 2)
>>
>> The query given to JDBCInputFormat provides results ordered by host and
>> timestamp, and I was wondering if performance can be improved by specifying
>> this in the code. I've looked at
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>> and
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>> but I still have some questions:
>>
>> - If a split is a subset of a partition, what is the meaning of
>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>> split is divided into partitions, meaning that a partition would be a
>> subset of a split.
>> - At which point can I retrieve and adjust a SplitDataProperties
>> instance, if possible at all?
>> - If I wanted a coarser parallelization where each slot gets all the data
>> for the same host, would I have to manually create the sub-groups based on
>> timestamp?
>>
>> Regards,
>> Alexis.
>>
>>
>


Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Alexis Sarda
It seems I may have spoken too soon. After executing the job with more
data, I can see the following things in the Flink dashboard:

- The first subtask is a chained DataSource -> GroupCombine. Even with
parallelism set to 24 and a ParameterValuesProvider returning
Array(Array("first"), Array("second")), only 1 thread processed all records.
- The second subtask is a Sorted Group Reduce, and I see two weird things:
  + The first subtask sent 5,923,802 records, yet the second subtask only
received 5,575,154 records?
  + Again, everything was done in a single thread, even though a groupBy
was used.
- The third and final subtask is a sink that saves back to the database.

Does anyone know why parallelism is not being used?

Regards,
Alexis.


On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda  wrote:

> Hi Fabian,
>
> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
> declares javaSet as private[flink], so I cannot access it directly.
> Nevertheless, I managed to get around it by using the java environment:
>
> val env = org.apache.flink.api.java.ExecutionEnvironment.
> getExecutionEnvironment
>
> val inputFormat = getInputFormat(query, dbUrl, properties)
> val outputFormat = getOutputFormat(dbUrl, properties)
>
> val source = env.createInput(inputFormat)
> val sdp = source.getSplitDataProperties
> sdp.splitsPartitionedBy(0)
> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>
> // transform java DataSet to scala DataSet...
> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>   .groupBy(0, 1)
>   .combineGroup(groupCombiner)
>   .withForwardedFields("f0->_1")
>   .groupBy(0, 1)
>   .reduceGroup(groupReducer)
>   .withForwardedFields("_1")
>   .output(outputFormat)
>
> It seems to work well, and the semantic annotation does remove a hash
> partition from the execution plan.
>
> Regards,
> Alexis.
>
>
> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske  wrote:
>
>> Hi Alexis,
>>
>> The Scala API does not expose a DataSource object but only a Scala
>> DataSet which wraps the Java object.
>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>
>> val dbData: DataSet[...] = ???
>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>
>> So you first have to get the wrapped Java DataSet, cast it to DataSource
>> and then get the properties.
>> It's not very nice, but should work.
>>
>> In order to use SDPs, you should be a bit familiar how physical data
>> properties are propagated and discarded in the optimizer.
>> For example, applying a simple MapFunction removes all properties because
>> the function might have changed the fields on which a DataSet is
>> partitioned or sorted.
>> You can expose the behavior of a function to the optimizer by using
>> Semantic Annotations [1]
>>
>> Some comments on the code and plan you shared:
>> - You might want to add hostname to ORDER BY to have the output grouped
>> by (ts, hostname).
>> - Check the Global and Local data properties in the plan to validate that
>> the SDP were correctly interpreted.
>> - If the data is already correctly partitioned and sorted, you might not
>> need the Combiners. In either case, you properly want to annotate them with
>> Forward Field annoations.
>>
>> The number of source tasks is unrelated to the number of splits. If you
>> have more tasks than splits, some tasks won't process any data.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>>
>>
>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda :
>>
>>> Hi Fabian,
>>>
>>> Thanks for the clarification. I have a few remarks, but let me provide
>>> more concrete information. You can find the query I'm using, the
>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>
>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>
>>> I cannot call getSplitDataProperties because
>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>> code, I do this instead:
>>>
>>> val javaEnv =
>>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>> "example")
>>>
>>> which feels wrong (the constructor doesn't accept a Scala environment).
>>> Is there a better alternative?
>>>
>>> I see absolutely no difference in the executi

Re: JDBCInputFormat and SplitDataProperties

2018-08-10 Thread Alexis Sarda
It ended up being a wrong configuration of the cluster; there was only 1
task manager with 1 slot.

If I submit a job with "flink run -p 24 ...", will the job hang until at
least 24 slots are available?

Regards,
Alexis.

On Fri, 10 Aug 2018, 14:01 Fabian Hueske  wrote:

> Can you share the plan for the program?
>
> Are you sure that more than 1 split is generated by the JdbcInputFormat?
>
> 2018-08-10 12:04 GMT+02:00 Alexis Sarda :
>
>> It seems I may have spoken too soon. After executing the job with more
>> data, I can see the following things in the Flink dashboard:
>>
>> - The first subtask is a chained DataSource -> GroupCombine. Even with
>> parallelism set to 24 and a ParameterValuesProvider returning
>> Array(Array("first"), Array("second")), only 1 thread processed all records.
>> - The second subtask is a Sorted Group Reduce, and I see two weird things:
>>   + The first subtask sent 5,923,802 records, yet the second subtask only
>> received 5,575,154 records?
>>   + Again, everything was done in a single thread, even though a groupBy
>> was used.
>> - The third and final subtask is a sink that saves back to the database.
>>
>> Does anyone know why parallelism is not being used?
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda 
>> wrote:
>>
>>> Hi Fabian,
>>>
>>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
>>> declares javaSet as private[flink], so I cannot access it directly.
>>> Nevertheless, I managed to get around it by using the java environment:
>>>
>>> val env = org.apache.flink.api.java.ExecutionEnvironment.
>>> getExecutionEnvironment
>>>
>>> val inputFormat = getInputFormat(query, dbUrl, properties)
>>> val outputFormat = getOutputFormat(dbUrl, properties)
>>>
>>> val source = env.createInput(inputFormat)
>>> val sdp = source.getSplitDataProperties
>>> sdp.splitsPartitionedBy(0)
>>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>>
>>> // transform java DataSet to scala DataSet...
>>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>>   .groupBy(0, 1)
>>>   .combineGroup(groupCombiner)
>>>   .withForwardedFields("f0->_1")
>>>   .groupBy(0, 1)
>>>   .reduceGroup(groupReducer)
>>>   .withForwardedFields("_1")
>>>   .output(outputFormat)
>>>
>>> It seems to work well, and the semantic annotation does remove a hash
>>> partition from the execution plan.
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske  wrote:
>>>
>>>> Hi Alexis,
>>>>
>>>> The Scala API does not expose a DataSource object but only a Scala
>>>> DataSet which wraps the Java object.
>>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>>
>>>> val dbData: DataSet[...] = ???
>>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>>>
>>>> So you first have to get the wrapped Java DataSet, cast it to
>>>> DataSource and then get the properties.
>>>> It's not very nice, but should work.
>>>>
>>>> In order to use SDPs, you should be a bit familiar how physical data
>>>> properties are propagated and discarded in the optimizer.
>>>> For example, applying a simple MapFunction removes all properties
>>>> because the function might have changed the fields on which a DataSet is
>>>> partitioned or sorted.
>>>> You can expose the behavior of a function to the optimizer by using
>>>> Semantic Annotations [1]
>>>>
>>>> Some comments on the code and plan you shared:
>>>> - You might want to add hostname to ORDER BY to have the output grouped
>>>> by (ts, hostname).
>>>> - Check the Global and Local data properties in the plan to validate
>>>> that the SDP were correctly interpreted.
>>>> - If the data is already correctly partitioned and sorted, you might
>>>> not need the Combiners. In either case, you properly want to annotate them
>>>> with Forward Field annoations.
>>>>
>>>> The number of source tasks is unrelated to the number of splits. If you
>>>> have more tasks than splits, some tasks won't process any data.
>>>>
>>>> Be

Data loss when connecting keyed streams

2021-05-21 Thread Alexis Sarda-Espinosa
Hello everyone,

I just experienced something weird and I'd like to know if anyone has any idea 
of what could have happened.

I have a simple Flink cluster version 1.11.3 running on Kubernetes with a 
single worker.
I was testing a pipeline that connects 2 keyed streams and processes the result 
with a KeyedCoProcessFunction before writing the results to a database.
I enabled tracing in my logs and started sending test input data that would 
generate two keys, therefore the job would have 4 streams with 2 keys that 
would be connected into 2 streams.
In the logs I could see the data from the 4 streams with the correct keys, but 
the logs of the KeyedCoProcessFunction showed data for only one of the keys, 
and indeed the other key was never seen in my database.
I re-submitted the job and now it's behaving as expected without changing the 
code at all.

Is this a known issue? Has anyone else experienced something similar?

A sample of the code in case it's useful:

KeyedStream allEventsWithTopology = openedEventsStream
.getSideOutput(Filter.ALL_EVENTS_DISREGARDING_FILTER)
.flatMap(new TopologicalPartitionKeyAssigner(...))
.name("all-topological-events-stream")
.uid(operatorPrefix + "all-topological-events-stream")
.keyBy(keySelector);

DataStream validCorrelationEvents = correlationEventStream
.keyBy(new CorrelationEventKeySelector())
.connect(allEventsWithTopology)
.process(new CorrelationEventValidator(...))
.name("valid-correlation-events")
.uid(operatorPrefix + "valid-correlation-events");

Regards,
Alexis.



OOM Metaspace after multiple jobs

2021-07-07 Thread Alexis Sarda-Espinosa
Hello,

I am currently testing a scenario where I would run the same job multiple times 
in a loop with different inputs each time. I am testing with a local Flink 
cluster v1.12.4. I initially got an OOM - Metaspace error, so I increased the 
corresponding memory in the TM's JVM (to 512m), but it still fails sometimes.

I found this issue that talked about Python jobs: 
https://issues.apache.org/jira/browse/FLINK-20333, but there is a comment there 
saying that it would also affect Java jobs. The commit linked there seems to be 
concerned with Python only. Was this also fixed in 1.12.0 for Java?

Is there anything I could do to force a more thorough class loader cleanup 
after each call to execute() ?

Regards,
Alexis.




Re: OOM Metaspace after multiple jobs

2021-07-07 Thread Alexis Sarda-Espinosa
I now see there have been problems with this in the past:

https://issues.apache.org/jira/browse/FLINK-16142
https://issues.apache.org/jira/browse/FLINK-19005

I actually use both JDBC and gRPC, so it seems this could indeed be an issue 
for me. Does anyone know if I can ensure my classes get cleaned up? In this 
scenario only my jobs would be running in the cluster, so I can have a bit more 
control.

Regards,
Alexis.


From: Alexis Sarda-Espinosa 
Sent: Thursday, July 8, 2021 12:14 AM
To: user@flink.apache.org 
Subject: OOM Metaspace after multiple jobs

Hello,

I am currently testing a scenario where I would run the same job multiple times 
in a loop with different inputs each time. I am testing with a local Flink 
cluster v1.12.4. I initially got an OOM - Metaspace error, so I increased the 
corresponding memory in the TM's JVM (to 512m), but it still fails sometimes.

I found this issue that talked about Python jobs: 
https://issues.apache.org/jira/browse/FLINK-20333, but there is a comment there 
saying that it would also affect Java jobs. The commit linked there seems to be 
concerned with Python only. Was this also fixed in 1.12.0 for Java?

Is there anything I could do to force a more thorough class loader cleanup 
after each call to execute() ?

Regards,
Alexis.




Re: Using Flink's Kubernetes API inside Java

2021-07-02 Thread Alexis Sarda-Espinosa
Hi Roman,

In the operator I mentioned I see logic like the one here: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/a60a9826d4bcdaa4f23cf296d95954b9f6f328c3/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L169

For instance, DefaultClusterClientServiceLoader is annotated with @Internal: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.html

And just like ApplicationClusterDeployer is in charge of Application Mode 
(according to the 
Javadoc<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.html>),
 I was wondering if there's something similar for Session Mode.

Perhaps I should subscribe to the developer mailing list? Although I guess 
that's part of the question, whether those classes count as User API or 
Developer API.

I took a quick glance at the operator you mentioned, but I'm hoping I can make 
use of Flink's new support for pod templates to make it as similar as possible 
to a native Deployment resource.

Regards,
Alexis.

From: Roman Khachatryan 
Sent: Friday, July 2, 2021 9:19 PM
To: Alexis Sarda-Espinosa ; Yang Wang 

Cc: user@flink.apache.org 
Subject: Re: Using Flink's Kubernetes API inside Java

Hi Alexis,

Have you looked at flink-on-k8s-operator [1]?
It seems to have the functionality you need:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/controllers/flinkcluster_reconciler.go#L569

I couldn't find many Flink-specific classes in the operator you
mentioned, but in general classes annotated with Public and
PublicEvolving are unlikely to change if that's your concern.

Also pulling in Yang Wang.

[1]
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/


Regards,
Roman


On Thu, Jul 1, 2021 at 7:49 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello everyone,
>
> I'm testing a custom Kubernetes operator that should fulfill some specific 
> requirements I have for Flink. I know of this WIP project: 
> https://github.com/wangyang0918/flink-native-k8s-operator
>
> I can see that it uses some classes that aren't publicly documented, and I 
> believe it doesn't cover Flink K8s sessions, which I would like to use.
>
> Is there something I could use for Flink K8s sessions? And is it ok if I use 
> these classes knowing that I might need adjustments for future Flink versions?
>
> Regards,
> Alexis.
>


RE: Using Flink's Kubernetes API inside Java

2021-07-07 Thread Alexis Sarda-Espinosa
Thanks Roman and Yang, I understand. I’ll have a look and ask on the developer 
list depending on what I find.

Regards,
Alexis.

From: Yang Wang 
Sent: Mittwoch, 7. Juli 2021 05:14
To: ro...@apache.org
Cc: Alexis Sarda-Espinosa ; 
user@flink.apache.org
Subject: Re: Using Flink's Kubernetes API inside Java

Hi Alexis,

I used to create a ticket[1] to make the ApplicationDeployer interface public. 
However, the community is very careful to add new public interfaces.
Because it will make the maintenance more difficult. AFAIK, the 
ApplicationDeployer is stable and it is a very basic requirement for 
Application mode.
I do not see an obvious reason to change it in the future.

Unfortunately, Flink does not have a common interface for deploying a session 
cluster. I think you could have your own SessionClusterDeployer, it could be
implemented as follows.


final ClusterClientFactory clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
try (final ClusterDescriptor clusterDescriptor =
clientFactory.createClusterDescriptor(configuration)) {
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);

clusterDescriptor.deploySessionCluster(clusterSpecification);

By setting the DeploymentOptions.TARGET to "kubernetes-session", you could 
activate the K8s descriptor.


[1]. https://issues.apache.org/jira/browse/FLINK-21807


Best,
Yang

Roman Khachatryan mailto:ro...@apache.org>> 于2021年7月7日周三 
上午12:18写道:
Hi Alexis,

KubernetesSessionCli provides a similar functionality IIUC but it's
also marked as @Internal (so it likely will change in the future; the
REST APIs it uses aren't likely to change, but I guess it doesn't help
as you'd like some helper classes.).
I think it's a good idea to ask this question on a dev mailing list.


Regards,
Roman

On Fri, Jul 2, 2021 at 11:19 PM Alexis Sarda-Espinosa
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
>
> Hi Roman,
>
> In the operator I mentioned I see logic like the one here: 
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/a60a9826d4bcdaa4f23cf296d95954b9f6f328c3/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L169
>
> For instance, DefaultClusterClientServiceLoader is annotated with @Internal: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.html
>
> And just like ApplicationClusterDeployer is in charge of Application Mode 
> (according to the Javadoc), I was wondering if there's something similar for 
> Session Mode.
>
> Perhaps I should subscribe to the developer mailing list? Although I guess 
> that's part of the question, whether those classes count as User API or 
> Developer API.
>
> I took a quick glance at the operator you mentioned, but I'm hoping I can 
> make use of Flink's new support for pod templates to make it as similar as 
> possible to a native Deployment resource.
>
> Regards,
> Alexis.
> 
> From: Roman Khachatryan mailto:ro...@apache.org>>
> Sent: Friday, July 2, 2021 9:19 PM
> To: Alexis Sarda-Espinosa 
> mailto:alexis.sarda-espin...@microfocus.com>>;
>  Yang Wang mailto:danrtsey...@gmail.com>>
> Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
> mailto:user@flink.apache.org>>
> Subject: Re: Using Flink's Kubernetes API inside Java
>
> Hi Alexis,
>
> Have you looked at flink-on-k8s-operator [1]?
> It seems to have the functionality you need:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/controllers/flinkcluster_reconciler.go#L569
>
> I couldn't find many Flink-specific classes in the operator you
> mentioned, but in general classes annotated with Public and
> PublicEvolving are unlikely to change if that's your concern.
>
> Also pulling in Yang Wang.
>
> [1]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/
>
>
> Regards,
> Roman
>
>
> On Thu, Jul 1, 2021 at 7:49 PM Alexis Sarda-Espinosa
> mailto:alexis.sarda-espin...@microfocus.com>>
>  wrote:
> >
> > Hello everyone,
> >
> > I'm testing a custom Kubernetes operator that should fulfill some specific 
> > requirements I have for Flink. I know of this WIP project: 
> > https://github.com/wangyang0918/flink-native-k8s-operator
> >
> > I can see that it uses some classes that aren't publicly documented, and I 
> > believe it doesn't cover Flink K8s sessions, which I would like to use.
> >
> > Is there something I could use for Flink K8s sessions? And is it ok if I 
> > use these classes knowing that I might need adjustments for future Flink 
> > versions?
> >
> > Regards,
> > Alexis.
> >


Using Flink's Kubernetes API inside Java

2021-07-01 Thread Alexis Sarda-Espinosa
Hello everyone,

I'm testing a custom Kubernetes operator that should fulfill some specific 
requirements I have for Flink. I know of this WIP project: 
https://github.com/wangyang0918/flink-native-k8s-operator

I can see that it uses some classes that aren't publicly documented, and I 
believe it doesn't cover Flink K8s sessions, which I would like to use.

Is there something I could use for Flink K8s sessions? And is it ok if I use 
these classes knowing that I might need adjustments for future Flink versions?

Regards,
Alexis.



Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-04-29 Thread Alexis Sarda-Espinosa
Hello,

I see that new Jira bots are now active. If no one has time to look at this, 
could documentation at least be updated to reflect the fact that removing 
fields from POJOs will break state restoration?

Regards,
Alexis.


From: Roman Khachatryan 
Sent: Friday, March 12, 2021 6:22 PM
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org 
Subject: Re: [Schema Evolution] Cannot restore from savepoint after deleting 
field from POJO

Hi Alexis,

This looks like a bug, I've created a Jira ticket to address it [1].
Please feel free to provide any additional information.

In particular, whether you are able to reproduce it in any of the
subsequent releases.

[1]
https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Roman


On Thu, Mar 11, 2021 at 5:36 PM Alexis Sarda-Espinosa
 wrote:
>
> Hi everyone,
>
>
>
> It seems I’m having either the same problem, or a problem similar to the one 
> mentioned here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html
>
>
>
> I have a POJO class that is used in Flink state. The class is annotated with 
> @TypeInfo as described, e.g., here: 
> https://stackoverflow.com/a/64721838/5793905
>
>
>
> Now I want to remove a field from the POJO. This removal is also considered 
> in the corresponding TypeInfoFactory. However, after trying to restore from a 
> savepoint where the POJO still had the field I get this exception:
>
>
>
> 2021-03-10T20:51:30.406Z INFO  org.apache.flink.runtime.taskmanager.Task:960 
> … (6/8) (d630d5ff0d7ae4fbc428b151abebab52) switched from RUNNING to FAILED. 
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
>
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedCoProcessOperator_c535ac415eeb524d67c88f4a481077d2_(6/8) from any of the 
> 1 provided restore options.
>
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>
> ... 6 common frames omitted
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>
> at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> ... 8 common frames omitted
>
> Caused by: java.lang.NullPointerException: null
>
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123)
>
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
>
> at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
>
> at 
> org.apache.flink.api.common.typeutils.CompositeSerializer$PrecomputedParameters.precompute(CompositeSerializer.java:228)
>
> at 
> org.apache.flink.api.common.typeutils.CompositeSerializer.(CompositeSerializer.java:51)
>
> at 
> org.apache.flink.run

DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Alexis Sarda-Espinosa
Hello,

Regarding the new BATCH mode of the data stream API, I see that the 
documentation states that some operators will process all data for a given key 
before moving on to the next one. However, I don't see how Flink is supposed to 
know whether the input will provide all data for a given key sequentially. In 
the DataSet API, an (undocumented?) feature is using SplitDataProperties 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/SplitDataProperties.html)
 to specify different grouping/partitioning/sorting properties, so if the data 
is pre-sorted (e.g. when reading from a database), some operations can be 
optimized. Will the DataStream API get something similar?

Regards,
Alexis.



[Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-03-11 Thread Alexis Sarda-Espinosa
Hi everyone,

It seems I'm having either the same problem, or a problem similar to the one 
mentioned here: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-when-restoring-from-savepoint-with-missing-state-amp-POJO-modification-td39945.html

I have a POJO class that is used in Flink state. The class is annotated with 
@TypeInfo as described, e.g., here: https://stackoverflow.com/a/64721838/5793905

Now I want to remove a field from the POJO. This removal is also considered in 
the corresponding TypeInfoFactory. However, after trying to restore from a 
savepoint where the POJO still had the field I get this exception:

2021-03-10T20:51:30.406Z INFO  org.apache.flink.runtime.taskmanager.Task:960 
... (6/8) (d630d5ff0d7ae4fbc428b151abebab52) switched from RUNNING to FAILED. 
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedCoProcessOperator_c535ac415eeb524d67c88f4a481077d2_(6/8) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 common frames omitted
Caused by: java.lang.NullPointerException: null
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:123)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
at 
org.apache.flink.api.common.typeutils.CompositeSerializer$PrecomputedParameters.precompute(CompositeSerializer.java:228)
at 
org.apache.flink.api.common.typeutils.CompositeSerializer.(CompositeSerializer.java:51)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer.(TtlStateFactory.java:250)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:359)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:330)
at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194)
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at 

Re: DataStream in batch mode - handling (un)ordered bounded data

2021-03-13 Thread Alexis Sarda-Espinosa
Hi Dawid,

I've entered a ticket: https://issues.apache.org/jira/browse/FLINK-21763. 
Personally, I can keep using the DataSet API for now, but if it will be 
deprecated at some point, it would be good to migrate rather sooner than later.

Regards,
Alexis.


From: Dawid Wysakowicz
Sent: Friday, March 12, 2021 4:10 PM
To: Alexis Sarda-Espinosa; user@flink.apache.org
Subject: Re: DataStream in batch mode - handling (un)ordered bounded data


Hi Alexis,

As of now there is no such feature in the DataStream API. The Batch mode in 
DataStream API is a new feature and we would be interested to hear about the 
use cases people want to use it for to identify potential areas to improve. 
What you are suggesting generally make sense so I think it would be nice if you 
could create a jira ticket for it.

Best,

Dawid

On 12/03/2021 15:37, Alexis Sarda-Espinosa wrote:

Hello,



Regarding the new BATCH mode of the data stream API, I see that the 
documentation states that some operators will process all data for a given key 
before moving on to the next one. However, I don’t see how Flink is supposed to 
know whether the input will provide all data for a given key sequentially. In 
the DataSet API, an (undocumented?) feature is using SplitDataProperties 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/SplitDataProperties.html)
 to specify different grouping/partitioning/sorting properties, so if the data 
is pre-sorted (e.g. when reading from a database), some operations can be 
optimized. Will the DataStream API get something similar?



Regards,

Alexis.




Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Alexis Sarda-Espinosa
Hello,

I have a Flink TM configured with taskmanager.memory.managed.size: 1372m. There 
is a streaming job using RocksDB for checkpoints, so I assume some of this 
memory will indeed be used.

I was looking at the metrics exposed through the REST interface, and I queried 
some of them:

/taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
 | jq
[
  {
"id": "Status.JVM.Memory.Heap.Committed",
"value": "1652031488"
  },
  {
"id": "Status.JVM.Memory.NonHeap.Committed",
"value": "234291200"
 223 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.MemoryUsed",
"value": "375015427"
358 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.TotalCapacity",
"value": "375063552"
358 MiB
  }
]

I presume direct memory is being used by Flink and its networking stack, as 
well as by the JVM itself. To be sure:


  1.  Increasing "taskmanager.memory.framework.off-heap.size" or 
"taskmanager.memory.task.off-heap.size" should increase 
Status.JVM.Memory.Direct.TotalCapacity, right?
  2.  I presume the native memory used by RocksDB cannot be tracked with these 
JVM metrics even if "state.backend.rocksdb.memory.managed" is true, right?

Based on this question: 
https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
 I imagine Flink/RocksDB either allocates memory completely independently of 
the JVM, or it uses unsafe. Since the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
 states that "Managed memory is managed by Flink and is allocated as native 
memory (off-heap)", I thought this native memory might show up as part of 
direct memory tracking, but I guess it doesn't.

Regards,
Alexis.



RE: Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Alexis Sarda-Espinosa
Hi Xintong,

Thanks for the info. Is there any way to access these metrics outside of the 
UI? I suppose Flink’s reporters might provide them, but will they also be 
available through the REST interface (or another interface)?

Regards,
Alexis.

From: Xintong Song 
Sent: Tuesday, 13 April 2021 14:30
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Clarification about Flink's managed memory and metric monitoring

Hi Alexis,

First of all, I strongly recommend not to look into the JVM metrics. These 
metrics are fetched directly from JVM and do not well correspond to Flink's 
memory configurations. They were introduced a long time ago and are preserved 
mostly for compatibility. IMO, they bring more confusion than convenience. In 
Flink-1.12, there is a newly designed TM metrics page in the web ui, which 
clearly shows how the metrics correspond to Flink's memory configurations (if 
any).

Concerning your questions.
1. Yes, increasing framework/task off-heap memory sizes should increase the 
direct memory capacity. Increasing the network memory size should also do that.
2. When 'state.backend.rocksdb.memory.managed' is true, RocksDB uses managed 
memory. Managed memory is not measured by any JVM metrics. It's not managed by 
JVM, meaning that it's not limited by '-XX:MaxDirectMemorySize' and is not 
controlled by the garbage collectors.


Thank you~

Xintong Song


On Tue, Apr 13, 2021 at 7:53 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a Flink TM configured with taskmanager.memory.managed.size: 1372m. There 
is a streaming job using RocksDB for checkpoints, so I assume some of this 
memory will indeed be used.

I was looking at the metrics exposed through the REST interface, and I queried 
some of them:

/taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
 | jq
[
  {
"id": "Status.JVM.Memory.Heap.Committed",
"value": "1652031488"
  },
  {
"id": "Status.JVM.Memory.NonHeap.Committed",
"value": "234291200"
 223 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.MemoryUsed",
"value": "375015427"
358 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.TotalCapacity",
"value": "375063552"
358 MiB
  }
]

I presume direct memory is being used by Flink and its networking stack, as 
well as by the JVM itself. To be sure:


  1.  Increasing "taskmanager.memory.framework.off-heap.size" or 
"taskmanager.memory.task.off-heap.size" should increase 
Status.JVM.Memory.Direct.TotalCapacity, right?
  2.  I presume the native memory used by RocksDB cannot be tracked with these 
JVM metrics even if "state.backend.rocksdb.memory.managed" is true, right?

Based on this question: 
https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
 I imagine Flink/RocksDB either allocates memory completely independently of 
the JVM, or it uses unsafe. Since the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
 states that "Managed memory is managed by Flink and is allocated as native 
memory (off-heap)", I thought this native memory might show up as part of 
direct memory tracking, but I guess it doesn’t.

Regards,
Alexis.



RE: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-08-26 Thread Alexis Sarda-Espinosa
I think it would be nice if the task manager pods get their values from the 
configuration file only if the pod templates don’t specify any resources. That 
was the goal of supporting pod templates, right? Allowing more custom scenarios 
without letting the configuration options get bloated.

Regards,
Alexis.

From: Denis Cosmin NUTIU 
Sent: Donnerstag, 26. August 2021 15:55
To: matth...@ververica.com
Cc: user@flink.apache.org; danrtsey...@gmail.com
Subject: Re: Deploying Flink on Kubernetes with fractional CPU and different 
limits and requests

Hi Matthias,

Thanks for getting back to me and for your time!

We have some Flink jobs deployed on Kubernetes and running kubectl top pod 
gives the following result:

NAMECPU(cores)   
MEMORY(bytes)
aa-78c8cb77d4-zlmpg  8m   1410Mi
aa-taskmanager-2-2   32m  1066Mi
bb-5f7b65f95c-jwb7t  7m   1445Mi
bb-taskmanager-2-2   32m  1016Mi
cc-54d967b55d-b567x   11m  514Mi
cc-taskmanager-4-111m  496Mi
dd-6fbc6b8666-krhlx   10m  535Mi
dd-taskmanager-2-212m  522Mi
xx-6845cf7986-p45lq 53m  526Mi
xx-taskmanager-5-2  11m  507Mi

During low workloads the jobs consume just about 100m CPU and during high 
workloads the CPU consumption increases to 500m-1000m. Having the ability to 
specify requests and limit separately would give us more deployment flexibility.

Sincerely,
Denis

On Thu, 2021-08-26 at 14:22 +0200, Matthias Pohl wrote:

CAUTION: This email originated from outside of our organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.
Hi Denis,
I did a bit of digging: It looks like there is no way to specify them 
independently. You can find documentation about pod templates for TaskManager 
and JobManager [1]. But even there it states that for cpu and memory, the 
resource specs are overwritten by the Flink configuration. The code also 
reveals that limit and requests are set using the same value [2].

I'm going to pull Yang Wang into this thread. I'm wondering whether there is a 
reason for that or whether it makes sense to create a Jira issue introducing 
more specific configuration parameters for limit and requests.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink
[2] 
https://github.com/apache/flink/blob/f64261c91b195ecdcd99975b51de540db89a3f48/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java#L324-L332

On Thu, Aug 26, 2021 at 11:17 AM Denis Cosmin NUTIU 
mailto:dnu...@bitdefender.com>> wrote:
Hello,

I've developed a Flink job and I'm trying to deploy it on a Kubernetes
cluster using Flink Native.

Setting kubernetes.taskmanager.cpu=0.5 and
kubernetes.jobmanager.cpu=0.5 sets the requests and limits to 500m,
which is correct, but I'd like to set the requests and limits to
different values, something like:

resources:
  requests:
memory: "1048Mi"
cpu: "100m"
  limits:
memory: "2096Mi"
cpu: "1000m"

I've tried using pod templates from Flink 1.13 and manually patching
the Kubernetes deployment file, the jobmanager gets spawned with the
correct reousrce requests and limits but the taskmanagers get spawned
with the defaults:

Limits:
  cpu: 1
  memory:  1728Mi
Requests:
  cpu: 1
  memory:  1728Mi

Is there any way I could set the requests/limits for the CPU/Memory to
different values when deploying Flink in Kubernetes? If not, would it
make sense to request this as a feature?

Thanks in advance!

Denis


Keystore format limitations for TLS

2021-08-16 Thread Alexis Sarda-Espinosa
Hello,

I am trying to configure TLS communication for a Flink cluster running on 
Kubernetes. I am currently using the BCFKS format and setting that as default 
via javax.net.ssl.keystoretype and javax.net.ssl.truststoretype (which are 
injected in the environment variable FLINK_ENV_JAVA_OPTS). The task manager is 
failing with "Invalid Keystore format", so I'm wondering if there are special 
limitations with regards to supported TLS configurations?

Regards,
Alexis.



RE: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-02 Thread Alexis Sarda-Espinosa
Just to provide my opinion, I find the idea of factors unintuitive for this 
specific case. When I’m working with Kubernetes resources and sizing, I have to 
think in absolute terms for all pods and define requests and limits with 
concrete values. Using factors for Flink means that I have to think differently 
for my Flink resources, and if I’m using pod templates, it makes this switch 
more jarring because I define what is essentially another Kubernetes resources 
that I’m familiar with, but some of the values in my template are ignored. 
Additionally, if I understand correctly, factors aren’t linear, right? If 
someone specifies a 1GiB request with a factor of 1.5, they only get 500MiB on 
top, but if they specify 10GiB, suddenly the limit goes all the way up to 15GiB.

Regards,
Alexis.

From: spoon_lz 
Sent: Donnerstag, 2. September 2021 14:12
To: Yang Wang 
Cc: Denis Cosmin NUTIU ; Alexis Sarda-Espinosa 
; matth...@ververica.com; 
user@flink.apache.org
Subject: Re: Deploying Flink on Kubernetes with fractional CPU and different 
limits and requests

Hi Yang,
I agree with you, but I think the limit-factor should be greater than or equal 
to 1, and default to 1 is a better choice.
If the default value is 1.5, the memory limit will exceed the actual physical 
memory of a node, which may result in OOM, machine downtime, or random pod 
death if the node runs full.
For some required jobs, increase this value appropriately.

Best,
Zhuo


On 09/2/2021 11:50,Yang 
Wang<mailto:danrtsey...@gmail.com> wrote:
Given that the limit-factor should be greater than 1, then using the 
limit-factor could also work for memory.

> Why do we need a larger memory resource limit than request?
A typical use case I could imagine is the page cache. Having more page cache 
might improve the performance.
And they could be reclaimed when the Kubernetes node does not have enough 
memory.

I still believe that it is the user responsibility to configure a proper 
resource(memory and cpu), not too big. And
using the limit-factor to allow the Flink job could benefit from the burst 
resources.


Best,
Yang

spoon_lz mailto:spoon...@126.com>> 于2021年9月1日周三 下午8:12写道:
Yes, shrinking the requested memory will result in OOM. We do this because the 
user-created job provides an initial value (for example, 2 cpus and 4096MB of 
memory for TaskManager). In most cases, the user will not change this value 
unless the task fails or there is an exception such as data delay. This results 
in a waste of memory for most simple ETL tasks. These isolated situations may 
not apply to more Flink users. We can adjust Kubernetes instead of Flink to 
solve the resource waste problem.
Just adjusting the CPU value might be a more robust choice, and there are 
probably some scenarios for both decreasing the CPU request and increasing the 
CPU limit

Best,
Zhuo

On 09/1/2021 19:39,Yang 
Wang<mailto:danrtsey...@gmail.com> wrote:
Hi Lz,

Thanks for sharing your ideas.

I have to admin that I prefer the limit factor to set the resource limit, not 
the percentage to set the resource request.
Because usually the resource request is configured or calculated by Flink, and 
it indicates user required resources.
It has the same semantic for all deployments(e.g. Yarn, K8s). Especially for 
the memory resource, giving a discount
for the resource request may cause OOM.
BTW, I am wondering why the users do not allocate fewer resources if they do 
not need.

@Denis Cosmin NUTIU<mailto:dnu...@bitdefender.com> I really appreciate for that 
you want to work on this feature. Let's first to reach a consensus
about the implementation. And then opening a PR is welcome.


Best,
Yang


spoon_lz mailto:spoon...@126.com>> 于2021年9月1日周三 下午4:36写道:

Hi,everyone
I have some other ideas for kubernetes resource Settings, as described by 
WangYang in [flink-15648], which increase the CPU limit by a certain percentage 
to provide more computational performance for jobs. Should we consider the 
alternative of shrinking the request to start more jobs, which would improve 
cluster resource utilization? For example, for some low-traffic tasks, we can 
even set the CPU request to 0 in extreme cases. Both limit enlargement and 
Request shrinkage may be required

Best,
Lz
On 09/1/2021 16:06,Denis Cosmin 
NUTIU<mailto:dnu...@bitdefender.com> wrote:
Hi Yang,

I have limited Flink internals knowledge, but I can try to implement 
FLINK-15648 and open up a PR on GitHub or send the patch via email. How does 
that sound?
I'll sign the ICLA and switch to my personal address.

Sincerely,
Denis

On Wed, 2021-09-01 at 13:48 +0800, Yang Wang wrote:
Great. If no one wants to work on this ticket FLINK-15648, I will try to get 
this done in the next major release cycle(1.15).

Best,
Yang

Denis Cosmin NUTIU mailto:dnu...@bitdefender.com>> 
于2021年8月31日周二 下午4:59写道:
Hi everyone,

Thanks for getting back to me!

>  I think it would be nice if the task manager pods get 

Re: logback variable substitution in kubernetes

2021-09-01 Thread Alexis Sarda-Espinosa
I'm fairly certain you need the curly braces surrounding the variable, the 
substitution is not done by the shell, it's just similar syntax (as mentioned 
in the doc 
http://logback.qos.ch/manual/configuration.html#variableSubstitution).
Chapter 3: Logback configuration - 
QOS.ch
Thus, the level of the root logger has no apparent effect because the loggers 
in chapters.configuration.MyApp3 and chapters.configuration.Foo classes are all 
enabled for the INFO level. As a side note, the chapters.configuration logger 
exists by virtue of its declaration in the configuration file - even if the 
Java source code does not directly refer to it.
logback.qos.ch


Regards,
Alexis.


From: houssem 
Sent: Wednesday, September 1, 2021 7:02 PM
To: user@flink.apache.org 
Subject: Re: logback variable substitution in kubernetes


Yes i did it all

when i hard code the log level and the file name everything works fine

but when i try to use variables , they won't be replaced.

On 2021/09/01 11:43:21, Yang Wang  wrote:
> Did you have removed the log4j related jars in the $FLINK_HOME/lib
> directory?
> Refer to the documentation[1] for how to use logback.
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/advanced/logging/#configuring-logback
>
> Best,
> Yang
>
> houssem  于2021年9月1日周三 下午5:00写道:
>
> > Yes i did this verification and i have all environment variables.
> >
> > On 2021/09/01 06:09:27, Yang Wang  wrote:
> > > From the logback documentation[1], it could support OS
> > > environment substitution.
> > > Could you please check that the environment variables have been properly
> > > set?
> > > Maybe you could tunnel into the Kubernetes pod via "kubectl exec" and do
> > > such verification.
> > >
> > > Best,
> > > Yang
> > >
> > > houssem  于2021年8月31日周二 下午7:28写道:
> > >
> > > >
> > > > Hello,
> > > >
> > > > I am running a flink application cluster in standalone kubernetes mode
> > and
> > > > i a using logback
> > > >
> > > > as a logging framework , th problem is i am not able tu use environment
> > > > variables configured in my pod inside my logback-console.xml file .
> > > >
> > > >
> > > >
> > > > I copied this file  from my file system while building my image.
> > > > Dockerfile:
> > > > ..
> > > > COPY logback-console.xml $FLINK_HOME/conf/
> > > > ..
> > > >
> > > >
> > > > here is my  logback-console file::
> > > >
> > > > 
> > > > 
> > > >
> > > > 
> > > >  
> > > >  
> > > >  
> > > >  
> > > >  Appenders
> > > >  #file
> > > > 
> > > > $LOG_FILE
> > > >  > class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
> > > > 
> > > > 
> > > >%n
> > > > 
> > > > 
> > > >  
> > > > 
> > > >
> > > >  #console
> > > >  > class="ch.qos.logback.core.ConsoleAppender">
> > > > 
> > > >  > class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
> > > > 
> > > > 
> > > > %n
> > > > 
> > > > 
> > > > 
> > > >  
> > > >
> > > > 
> > > > 
> > > > 
> > > > 
> > > >
> > >
> >
>


RE: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-03 Thread Alexis Sarda-Espinosa
Hi Yang,

I understand the issue, and yes, if Flink memory must be specified in the 
configuration anyway, it’s probably better to leave memory configuration in the 
templates empty.

For the CPU case I still think the template’s requests/limits should have 
priority if they are specified. The factor could still be used if the template 
doesn’t specify anything. I’m not sure if it would be entirely intuitive, but 
the logic could be something like this:


  1.  To choose CPU request
 *   Read pod template first
 *   If template doesn’t have anything, read from kubernetes.taskmanager.cpu
 *   If configuration is not specified, fall back to default
  2.  To choose CPU limit
 *   Read from template first
 *   If template doesn’t have anything, apply factor to what was chosen in 
step 1, where the default factor is 1.

Regards,
Alexis.

From: Yang Wang 
Sent: Freitag, 3. September 2021 08:09
To: Alexis Sarda-Espinosa 
Cc: spoon_lz ; Denis Cosmin NUTIU ; 
matth...@ververica.com; user@flink.apache.org
Subject: Re: Deploying Flink on Kubernetes with fractional CPU and different 
limits and requests

Hi Alexis

Thanks for your valuable inputs.

First, I want to share why Flink has to overwrite the resources which are 
defined in the pod template. You could the fields that will be
overwritten by Flink here[1]. I think the major reason is that Flink need to 
ensure the consistency between Flink configuration
(taskmanager.memory.process.size, kubernetes.taskmanager.cpu)
and pod template resource settings. Since users could specify the total process 
memory or detailed memory[2], Flink will calculate the
pod resource internally. If we allow users could specify the resources via pod 
template, then the users should guarantee the configuration
consistency especially when they specify the detailed memory(e.g. heap, 
managed, offheap, etc.). I believe it is a new burden for them.

For the limit-factor, you are right that factors aren’t linear. But I think the 
factor is more flexible than the absolute value. A bigger pod usually
could use more burst resources. Moreover, I do not suggest to set limit-factor 
for memory since it does not take too much benefit. As a comparison,
the burst cpu resources could help a lot for the performance.

[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template
[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#detailed-memory-model


@spoon_lz<mailto:spoon...@126.com> You are right. The limit-factor should be 
greater than or equal to 1. And the default value is 1.


Best,
Yang

Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 于2021年9月2日周四 下午8:20写道:
Just to provide my opinion, I find the idea of factors unintuitive for this 
specific case. When I’m working with Kubernetes resources and sizing, I have to 
think in absolute terms for all pods and define requests and limits with 
concrete values. Using factors for Flink means that I have to think differently 
for my Flink resources, and if I’m using pod templates, it makes this switch 
more jarring because I define what is essentially another Kubernetes resources 
that I’m familiar with, but some of the values in my template are ignored. 
Additionally, if I understand correctly, factors aren’t linear, right? If 
someone specifies a 1GiB request with a factor of 1.5, they only get 500MiB on 
top, but if they specify 10GiB, suddenly the limit goes all the way up to 15GiB.

Regards,
Alexis.

From: spoon_lz mailto:spoon...@126.com>>
Sent: Donnerstag, 2. September 2021 14:12
To: Yang Wang mailto:danrtsey...@gmail.com>>
Cc: Denis Cosmin NUTIU mailto:dnu...@bitdefender.com>>; 
Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>;
 matth...@ververica.com<mailto:matth...@ververica.com>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Deploying Flink on Kubernetes with fractional CPU and different 
limits and requests

Hi Yang,
I agree with you, but I think the limit-factor should be greater than or equal 
to 1, and default to 1 is a better choice.
If the default value is 1.5, the memory limit will exceed the actual physical 
memory of a node, which may result in OOM, machine downtime, or random pod 
death if the node runs full.
For some required jobs, increase this value appropriately.

Best,
Zhuo


On 09/2/2021 11:50,Yang 
Wang<mailto:danrtsey...@gmail.com> wrote:
Given that the limit-factor should be greater than 1, then using the 
limit-factor could also work for memory.

> Why do we need a larger memory resource limit than request?
A typical use case I could imagine is the page cache. Having more page cache 
might improve the performance.
And they could be reclaimed when the Kubernetes node does not have enough 
memory.

I still believe that it is the user responsibility to configure a proper 
r

Re: TaskManagers OOM'ing for Flink App with very large state only when restoring from checkpoint

2021-09-13 Thread Alexis Sarda-Espinosa
I'm not very knowledgeable when it comes to Linux memory management, but do 
note that Linux (and by extension Kubernetes) takes disk IO into account when 
deciding whether a process is using more memory than it's allowed to, see e.g. 
https://faun.pub/how-much-is-too-much-the-linux-oomkiller-and-used-memory-d32186f29c9d

Regards,
Alexis.


From: Guowei Ma 
Sent: Monday, September 13, 2021 8:35 AM
To: Kevin Lam 
Cc: user 
Subject: Re: TaskManagers OOM'ing for Flink App with very large state only when 
restoring from checkpoint

Hi, Kevin

1. Could you give me some specific information, such as what version of Flink 
is you using, and is it using DataStream or SQL?
2. As far as I know, RocksDB will put state on disk, so it will not consume 
memory all the time and cause OOM in theory.
So you can see if there are any object leaks by analyzing the Jmap of 
TaskManger after Failover.
3. There is another way, you can trigger a save point first, and then resume 
the job from the save point to see if there is still OOM,
 if not, then it is likely to be related to your application code.

Best,
Guowei


On Sat, Sep 11, 2021 at 2:01 AM Kevin Lam 
mailto:kevin@shopify.com>> wrote:
Hi all,

We've seen scenarios where TaskManagers will begin to OOM, shortly after a job 
restore from checkpoint. Our flink app has a very large state (100s of GB) and 
we use RocksDB as a backend.

Our repro is something like this: run the job for an hour and let it accumulate 
state, kill a task manager. The job restores properly, but then minutes later 
task managers begin to be killed on K8S due to OOM, and this causes a 
degenerate state where the job restores and new OOMs cause the job to restore 
again and it never recovers.

We've tried increasing the TaskManager memory (doubled), and observed that OOMs 
still happen even when the allocated k8s container memory is not maxed out.

Can you shed some light on what happens during a restore process? How are 
checkpoints loaded, and how does this affect the memory pressure of task 
managers (that for eg. have had a task running, got it cancelled, and 
re-assigned a new task as part of restore)?

Any help is appreciated!



RE: Fast serialization for Kotlin data classes

2021-09-16 Thread Alexis Sarda-Espinosa
Someone please correct me if I’m wrong but, until FLINK-16686 [1] is fixed, a 
class must be a POJO to be used in managed state with RocksDB, right? That’s 
not to say that the approach with TypeInfoFactory won’t work, just that even 
then it will mean none of the data classes can be used for managed state.

[1] https://issues.apache.org/jira/browse/FLINK-16686

Regards,
Alexis.

From: Matthias Pohl 
Sent: Donnerstag, 16. September 2021 13:12
To: Alex Cruise 
Cc: Flink ML 
Subject: Re: Fast serialization for Kotlin data classes

Hi Alex,
have you had a look at TypeInfoFactory? That might be the best way to come up 
with a custom serialization mechanism. See the docs [1] for further details.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Sep 14, 2021 at 8:33 PM Alex Cruise 
mailto:a...@cluonflux.com>> wrote:
Hi there,

I appreciate the fact that Flink has built-in support for making POJO and Scala 
`case class` serialization faster, but in my project we use immutable Kotlin 
`data class`es (analogous to Scala `case class`es) extensively, and we'd really 
prefer not to make them POJOs, mostly for style/taste reasons (e.g. need a 
default constructor and setters, both are anathema!)

Does anyone know of a good way for us to keep using idiomatic, immutable Kotlin 
data classes, but to get much faster serialization performance in Flink?

Thanks!

-0xe1a


Using POJOs with the table API

2021-08-05 Thread Alexis Sarda-Espinosa
Hi everyone,

I had been using the DataSet API until now, but since that's been deprecated, I 
started looking into the Table API. In my DataSet job I have a lot of POJOs, 
all of which are even annotated with @TypeInfo and provide the corresponding 
factories. The Table API documentation talks about POJOs here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#user-defined-data-types

I started with a single UDF to try it out (an AggregateFunction), but I have 
encountered several issues.

My accumulator class is a (serializable) POJO, but initially there were 
validation exceptions because the specified class is abstract. I added this to 
the class and got over that:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator.class)
)

Then there were exceptions about the output type. Since it's also a POJO, I 
thought this would help:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator::class),
output = DataTypeHint("RAW", bridgedTo =  MyDTO.class)
)

But no luck: org.apache.flink.table.api.TableException: Unsupported conversion 
from data type 'RAW('com.MyDTO', '...')' (conversion class: com.MyDTO) to type 
information. Only data types that originated from type information fully 
support a reverse conversion.

I figured I would try something simpler and first return a List from my 
AggregateFunction. But how do I define that in a DataTypeHint? I'm not sure if 
that's documented, but I looked through LogicalTypeParser and used:

output = DataTypeHint("ARRAY")

But that throws an exception (see attachment): Table program cannot be 
compiled. This is a bug. Please file an issue.

I changed the List to String[] and that finally worked.

Even getting a simple test running was difficult. I simply could not get 
TableEnvironment#fromValues to work with POJOs as input, and I tried many 
combinations of DataTypes.of(MyPojo.class)

At this point I still don't know how to return complex data structures 
encapsulated in POJOs from my UDF. Am I missing something very essential?

Regards,
Alexis.

org.apache.flink.table.api.TableException: Failed to wait job finish

at 
org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at 
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:130)
at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
at com.E2ETest.tablePlayground(E2ETest.kt:149)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at 

RE: Troubleshooting checkpoint timeout

2021-10-21 Thread Alexis Sarda-Espinosa
I would really appreciate more fine-grained information regarding the factors 
that can affect a checkpoint’s:


  *   Sync duration
  *   Async duration
  *   Alignment duration
  *   Start delay

Otherwise those metrics don’t really help me know in which areas to look for 
issues.

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
Sent: Mittwoch, 20. Oktober 2021 09:43
To: Parag Somani ; Caizhi Weng 
Cc: Flink ML 
Subject: RE: Troubleshooting checkpoint timeout

Currently the windows are 10 minutes in size with a 1-minute slide time. The 
approximate 500 event/minute throughput is already rather high for my use case, 
so I don’t expect it to be higher, but I would imagine that’s still pretty low.

I did have some issues with storage space, and I wouldn’t be surprised if there 
is an IO bottleneck in my dev environment, but then my main question would be: 
if IO is being throttled, could that result in the high “start delay” times I 
observe? That seems to be the main slowdown, so I just want to be sure I’m 
looking in the right direction.

I’d like to mention another thing about my pipeline’s structure in case it’s 
relevant, although it may be completely unrelated. I said that I specify the 
windowing properties once (windowedStream in my 1st e-mail) and use it twice, 
but it’s actually used 3 times. In addition to the 2 ProcessWindowFunctions 
that end in sinks, the stream is also joined with a side output:

openedEventsTimestamped = openedEvents
.getSideOutput(…)
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy)

windowedStream
.process(ProcessWindowFunction3())
.keyBy(keySelector)

.connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped, 
keySelector))
.process(...)

Could this lead to delays or alignment issues?

Regards,
Alexis.

From: Parag Somani mailto:somanipa...@gmail.com>>
Sent: Mittwoch, 20. Oktober 2021 09:22
To: Caizhi Weng mailto:tsreape...@gmail.com>>
Cc: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>;
 Flink ML mailto:user@flink.apache.org>>
Subject: Re: Troubleshooting checkpoint timeout

I had similar problem, where i have concurrent two checkpoints were configured. 
Also, i used to save it in S3(using minio) on k8s 1.18 env.

Flink service were getting restarted and timeout was happening. It got resolved:
1. As minio ran out of disk space, caused failure of checkpoints(this was the 
main cause).
2. Added duration/interval of checkpoint parameter to address it
execution.checkpointing.max-concurrent-checkpoints and 
execution.checkpointing.min-pause
Details of same at:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing


On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng 
mailto:tsreape...@gmail.com>> wrote:
Hi!

I see you're using sliding event time windows. What's the exact value of 
windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is large 
and windowSlideTimeMinutes is small then each record may be assigned to a large 
number of windows as the pipeline proceeds, thus gradually slows down 
checkpointing and finally causes a timeout.

Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 于2021年10月19日周二 下午7:29写道:
Hello everyone,

I am doing performance tests for one of our streaming applications and, after 
increasing the throughput a bit (~500 events per minute), it has started 
failing because checkpoints cannot be completed within 10 minutes. The Flink 
cluster is not exactly under my control and is running on Kubernetes with 
version 1.11.3 and RocksDB backend.

I can access the UI and logs and have confirmed:


  *   Logs do indicate expired checkpoints.
  *   There is no backpressure in any operator.
  *   When checkpoints do complete (seemingly at random):

 *   Size is 10-20MB.
 *   Sync and Async durations are at most 1-2 seconds.
 *   In one of the tasks, alignment takes 1-3 minutes, but start delays 
grow to up to 5 minutes.

  *   The aforementioned task (the one with 5-minute start delay) has 8 
sub-tasks and I see no indication of data skew. When the checkpoint times out, 
none of the sub-tasks have acknowledged the checkpoint.

The problematic task that is failing very often (and holding back downstream 
tasks) consists of the following operations:

timestampedEventStream = events
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy);

windowedStream = 
DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, keySelector)
.window(SlidingEventTimeWindows.of(
Time.minutes(windowLengthMinutes),
Time.minutes(windowSlideTimeMinutes)))
.allowedLateness(Time.minutes(allowedLatenessMinutes));

windowedStream
.process(new ProcessWindowFunction1(config))
// add sink

windowe

Troubleshooting checkpoint timeout

2021-10-19 Thread Alexis Sarda-Espinosa
Hello everyone,

I am doing performance tests for one of our streaming applications and, after 
increasing the throughput a bit (~500 events per minute), it has started 
failing because checkpoints cannot be completed within 10 minutes. The Flink 
cluster is not exactly under my control and is running on Kubernetes with 
version 1.11.3 and RocksDB backend.

I can access the UI and logs and have confirmed:


  *   Logs do indicate expired checkpoints.
  *   There is no backpressure in any operator.
  *   When checkpoints do complete (seemingly at random):
 *   Size is 10-20MB.
 *   Sync and Async durations are at most 1-2 seconds.
 *   In one of the tasks, alignment takes 1-3 minutes, but start delays 
grow to up to 5 minutes.
  *   The aforementioned task (the one with 5-minute start delay) has 8 
sub-tasks and I see no indication of data skew. When the checkpoint times out, 
none of the sub-tasks have acknowledged the checkpoint.

The problematic task that is failing very often (and holding back downstream 
tasks) consists of the following operations:

timestampedEventStream = events
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy);

windowedStream = 
DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, keySelector)
.window(SlidingEventTimeWindows.of(
Time.minutes(windowLengthMinutes),
Time.minutes(windowSlideTimeMinutes)))
.allowedLateness(Time.minutes(allowedLatenessMinutes));

windowedStream
.process(new ProcessWindowFunction1(config))
// add sink

windowedStream
.process(new ProcessWindowFunction2(config))
// add sink

Both window functions are using managed state, but nothing out of the ordinary 
(as mentioned above, state size is actually very small). Do note that the same 
windowedStream is used twice.

I don't see any obvious runtime issues and I don't think the load is 
particularly high, but maybe there's something wrong in my pipeline definition? 
What else could cause these timeouts?

Regards,
Alexis.



RE: Troubleshooting checkpoint timeout

2021-10-20 Thread Alexis Sarda-Espinosa
Currently the windows are 10 minutes in size with a 1-minute slide time. The 
approximate 500 event/minute throughput is already rather high for my use case, 
so I don’t expect it to be higher, but I would imagine that’s still pretty low.

I did have some issues with storage space, and I wouldn’t be surprised if there 
is an IO bottleneck in my dev environment, but then my main question would be: 
if IO is being throttled, could that result in the high “start delay” times I 
observe? That seems to be the main slowdown, so I just want to be sure I’m 
looking in the right direction.

I’d like to mention another thing about my pipeline’s structure in case it’s 
relevant, although it may be completely unrelated. I said that I specify the 
windowing properties once (windowedStream in my 1st e-mail) and use it twice, 
but it’s actually used 3 times. In addition to the 2 ProcessWindowFunctions 
that end in sinks, the stream is also joined with a side output:

openedEventsTimestamped = openedEvents
.getSideOutput(…)
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy)

windowedStream
.process(ProcessWindowFunction3())
.keyBy(keySelector)

.connect(DataStreamUtils.reinterpretAsKeyedStream(openedEventsTimestamped, 
keySelector))
.process(...)

Could this lead to delays or alignment issues?

Regards,
Alexis.

From: Parag Somani 
Sent: Mittwoch, 20. Oktober 2021 09:22
To: Caizhi Weng 
Cc: Alexis Sarda-Espinosa ; Flink ML 

Subject: Re: Troubleshooting checkpoint timeout

I had similar problem, where i have concurrent two checkpoints were configured. 
Also, i used to save it in S3(using minio) on k8s 1.18 env.

Flink service were getting restarted and timeout was happening. It got resolved:
1. As minio ran out of disk space, caused failure of checkpoints(this was the 
main cause).
2. Added duration/interval of checkpoint parameter to address it
execution.checkpointing.max-concurrent-checkpoints and 
execution.checkpointing.min-pause
Details of same at:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#checkpointing


On Wed, Oct 20, 2021 at 7:50 AM Caizhi Weng 
mailto:tsreape...@gmail.com>> wrote:
Hi!

I see you're using sliding event time windows. What's the exact value of 
windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is large 
and windowSlideTimeMinutes is small then each record may be assigned to a large 
number of windows as the pipeline proceeds, thus gradually slows down 
checkpointing and finally causes a timeout.

Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 于2021年10月19日周二 下午7:29写道:
Hello everyone,

I am doing performance tests for one of our streaming applications and, after 
increasing the throughput a bit (~500 events per minute), it has started 
failing because checkpoints cannot be completed within 10 minutes. The Flink 
cluster is not exactly under my control and is running on Kubernetes with 
version 1.11.3 and RocksDB backend.

I can access the UI and logs and have confirmed:


  *   Logs do indicate expired checkpoints.
  *   There is no backpressure in any operator.
  *   When checkpoints do complete (seemingly at random):

 *   Size is 10-20MB.
 *   Sync and Async durations are at most 1-2 seconds.
 *   In one of the tasks, alignment takes 1-3 minutes, but start delays 
grow to up to 5 minutes.

  *   The aforementioned task (the one with 5-minute start delay) has 8 
sub-tasks and I see no indication of data skew. When the checkpoint times out, 
none of the sub-tasks have acknowledged the checkpoint.

The problematic task that is failing very often (and holding back downstream 
tasks) consists of the following operations:

timestampedEventStream = events
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy);

windowedStream = 
DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, keySelector)
.window(SlidingEventTimeWindows.of(
Time.minutes(windowLengthMinutes),
Time.minutes(windowSlideTimeMinutes)))
.allowedLateness(Time.minutes(allowedLatenessMinutes));

windowedStream
.process(new ProcessWindowFunction1(config))
// add sink

windowedStream
.process(new ProcessWindowFunction2(config))
// add sink

Both window functions are using managed state, but nothing out of the ordinary 
(as mentioned above, state size is actually very small). Do note that the same 
windowedStream is used twice.

I don’t see any obvious runtime issues and I don’t think the load is 
particularly high, but maybe there’s something wrong in my pipeline definition? 
What else could cause these timeouts?

Regards,
Alexis.



--
Regards,
Parag Surajmal Somani.


Re: Kubernetes HA - Reusing storage dir for different clusters

2021-10-08 Thread Alexis Sarda-Espinosa
Hi Yang,

thanks for the confirmation. If I manually stop the job by deleting the 
Kubernetes deployment before it completes, I suppose the files will not be 
cleaned up, right? That's a somewhat non-standard scenario, so I wouldn't 
expect Flink to clean up, I just want to be sure.

Regards,
Alexis.


From: Yang Wang 
Sent: Friday, October 8, 2021 5:24 AM
To: Alexis Sarda-Espinosa 
Cc: Flink ML 
Subject: Re: Kubernetes HA - Reusing storage dir for different clusters

When the Flink job reached to global terminal state(FAILED, CANCELED, 
FINISHED), all the HA related data(including pointers in ConfigMap and concrete 
data in DFS) will be cleaned up automatically.

Best,
Yang

Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 于2021年10月4日周一 下午3:59写道:

Hello,



If I deploy a Flink-Kubernetes application with HA, I need to set 
high-availability.storageDir. If my application is a batch job that may run 
multiple times with the same configuration, do I need to manually clean up the 
storage dir between each execution?



Regards,

Alexis.




Kubernetes HA - Reusing storage dir for different clusters

2021-10-04 Thread Alexis Sarda-Espinosa
Hello,

If I deploy a Flink-Kubernetes application with HA, I need to set 
high-availability.storageDir. If my application is a batch job that may run 
multiple times with the same configuration, do I need to manually clean up the 
storage dir between each execution?

Regards,
Alexis.



RE: Troubleshooting checkpoint timeout

2021-10-25 Thread Alexis Sarda-Espinosa
Hi Piotrek,

Thanks for all the information, I guess I was reading older versions of the 
documentation that didn’t have that.

I was just using the job graph UI to check backpressure, but after looking at 
other factors, I think there is indeed some backpressure, but I don’t know how 
it builds up (there’s none at the beginning of the job). I can’t easily upgrade 
the Flink version just yet, so I don’t have access to all the new facilities 
but based on what I do have I have some additional remarks/questions.

There’s a good chance the problem begins when the job starts running out of 
(heap) memory and the GC introduces delays. That’s of course independent of 
Flink and I’ll have to look at the cause, but even if I increase available 
memory, I still see delays (at least for some time); I know this because one of 
my operators uses timers and logs their timestamps, and I can see the timer 
timestamps lagging clock time by up to 1 hour. Since the logs don’t indicate 
the operator’s logic takes a significant amount of time and CPU is far below 
the available limit (the single TM barely uses more than 1 CPU out of 4), I’d 
guess the lag could be related to checkpoint alignment, which takes me to my 
questions:


  1.  The documentation states “Operators that receive more than one input 
stream need to align the input streams on the snapshot barriers”. If an 
operator has parallelism > 1, does that count as more than one stream? Or is 
there a single output barrier for all subtask outputs that gets “copied” to all 
downstream subtask inputs?
  2.  Similarly, alignment duration is said to be “The time between processing 
the first and the last checkpoint barrier”. What exactly is the interpretation 
of “first” and “last” here? Do they relate to a checkpoint “n” where “first” 
would be the barrier for n-1 and “last” the one for n?
  3.  Start delay also refers to the “first checkpoint barrier to reach this 
subtask”. As before, what is “first” in this context?
  4.  Maybe this will be answered by the previous questions, but what happens 
to barriers if a downstream operator has lower parallelism?

Regards,
Alexis.

From: Piotr Nowojski 
Sent: Montag, 25. Oktober 2021 09:59
To: Alexis Sarda-Espinosa 
Cc: Parag Somani ; Caizhi Weng ; 
Flink ML 
Subject: Re: Troubleshooting checkpoint timeout

Hi Alexis,

You can read about those metrics in the documentation [1]. Long alignment 
duration and start delay almost always come together. High values indicate long 
checkpoint barrier propagation times through the job graph, that's always (at 
least so far I haven't seen a different reason) caused by the same thing: 
backpressure. Which brings me to

> There is no backpressure in any operator.

Why do you think so?

For analysing backpressure I would highly recommend upgrading to Flink 1.13.x 
as it has greatly improved tooling for that [2]. Since Flink 1.10 I believe you 
can use the `isBackPressured` metric. In previous versions you would have to 
rely on buffer usage metrics as described here [3].

If this is indeed a problem with a backpressure, there are three things you 
could do to improve checkpointing time:
a) Reduce the backpressure, either by optimising your job/code or scaling up.
b) Reduce the amount of in-flight data. Since Flink 1.14.x, Flink can do it 
automatically when buffer debloating is enabled, but the same principle could 
be used to manually and statically configure cluster to have less in-flight 
data. You can read about this here [4].
c) Enabled unaligned checkpoints [5].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
[2] https://flink.apache.org/2021/07/07/backpressure.html
[3] 
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#network-metrics
[4] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/#the-buffer-debloating-mechanism
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints

Best,
Piotrek

czw., 21 paź 2021 o 19:00 Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 napisał(a):
I would really appreciate more fine-grained information regarding the factors 
that can affect a checkpoint’s:


  *   Sync duration
  *   Async duration
  *   Alignment duration
  *   Start delay

Otherwise those metrics don’t really help me know in which areas to look for 
issues.

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Sent: Mittwoch, 20. Oktober 2021 09:43
To: Parag Somani mailto:somanipa...@gmail.com>>; Caizhi 
Weng mailto:tsreape...@gmail.com>>
Cc: Flink ML mailto:user@flink.apache.org>>
Subject: RE: Troubleshooting checkpoint timeout

Currently the windows are 10 minutes in size with a 1-minute slide time. The 
approximate 500 event/minute throughput is already rather high for my use case, 
so I don’t expect it to b

Watermark behavior when connecting streams

2021-12-01 Thread Alexis Sarda-Espinosa
Hi everyone,

Based on what I know, a single operator with parallelism > 1 checks the 
watermarks from all its streams and uses the smallest one out of the non-idle 
streams. My first question is whether watermarks are forwarded as long as a 
different watermark strategy is not applied downstream? For example, will my 
stream keep its watermarks even after windowing + processing?

The second question is what happens with watermarks after connecting 2 streams, 
specifically these 2 possibilities:

- One stream was watermarks but the other one doesn't.
- Both streams have watermarks.

Regards,
Alexis.



RE: Using POJOs with the table API

2021-10-26 Thread Alexis Sarda-Espinosa
Hello,

I've found a ticket that talks about very high-level improvements to the Table 
API [1]. Are there any more concrete pointers for migration from DataSet to 
Table API? Will it be possible at all to use POJOs with the Table API?

[1] https://issues.apache.org/jira/browse/FLINK-20787

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
Sent: Donnerstag, 5. August 2021 15:49
To: user@flink.apache.org
Subject: Using POJOs with the table API

Hi everyone,

I had been using the DataSet API until now, but since that's been deprecated, I 
started looking into the Table API. In my DataSet job I have a lot of POJOs, 
all of which are even annotated with @TypeInfo and provide the corresponding 
factories. The Table API documentation talks about POJOs here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#user-defined-data-types

I started with a single UDF to try it out (an AggregateFunction), but I have 
encountered several issues.

My accumulator class is a (serializable) POJO, but initially there were 
validation exceptions because the specified class is abstract. I added this to 
the class and got over that:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator.class)
)

Then there were exceptions about the output type. Since it's also a POJO, I 
thought this would help:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator::class),
output = DataTypeHint("RAW", bridgedTo =  MyDTO.class)
)

But no luck: org.apache.flink.table.api.TableException: Unsupported conversion 
from data type 'RAW('com.MyDTO', '...')' (conversion class: com.MyDTO) to type 
information. Only data types that originated from type information fully 
support a reverse conversion.

I figured I would try something simpler and first return a List from my 
AggregateFunction. But how do I define that in a DataTypeHint? I'm not sure if 
that's documented, but I looked through LogicalTypeParser and used:

output = DataTypeHint("ARRAY")

But that throws an exception (see attachment): Table program cannot be 
compiled. This is a bug. Please file an issue.

I changed the List to String[] and that finally worked.

Even getting a simple test running was difficult. I simply could not get 
TableEnvironment#fromValues to work with POJOs as input, and I tried many 
combinations of DataTypes.of(MyPojo.class)

At this point I still don't know how to return complex data structures 
encapsulated in POJOs from my UDF. Am I missing something very essential?

Regards,
Alexis.



RE: Troubleshooting checkpoint timeout

2021-10-25 Thread Alexis Sarda-Espinosa
Oh, I got it. I should’ve made the connection earlier after you said “Once an 
operator decides to send/broadcast a checkpoint barrier downstream, it just 
broadcasts it to all output channels”.

I’ll see what I can do about upgrading the Flink version and do some more tests 
with unaligned checkpoints. Thanks again for all the info.

Regards,
Alexis.

From: Piotr Nowojski 
Sent: Montag, 25. Oktober 2021 15:51
To: Alexis Sarda-Espinosa 
Cc: Parag Somani ; Caizhi Weng ; 
Flink ML 
Subject: Re: Troubleshooting checkpoint timeout

Hi Alexis,

>  Should I understand these metrics as a property of an operator and not of 
> each subtask (at least for aligned checkpoints)? Then “first” and “last” 
> would make sense to me: first/last across all subtasks/channels for a given 
> operator.

Those are properties of a subtask. Subtasks are a collection of chained 
parallel instances of operators. If you have a simple job like 
`source.keyBy(...).window(...).process(...)`, with parallelism of 10, you will 
have two tasks. Each task will have 10 subtasks. Each subtask will have only a 
single element operator chain, with a single operator (either source operator 
for the source task/subtasks, or window/process function for the second task). 
If you add a sink to your job 
`source.keyBy(...).window(...).process(...).addSink(...)`, this sink will be 
chained with the window/process operator. You will still end up with two tasks:

1. Source
2. Window -> Sink

again, each will have 10 subtasks, with parallel instances of the respective 
operators.

So if you look at the "alignment duration" of a subtask from "2. Window -> 
Sink" task, that will be the difference between receiving a first checkpoint 
barrier from any of the "1. Source" subtasks and the last checkpoint barrier 
from those "1. Source" subtasks.

> Naturally, for unaligned checkpoints, alignment duration isn’t applicable, 
> but what about Start Delay? I imagine that might indeed be a property of the 
> subtask and not the operator.

As per the docs that I've already linked [1]

Alignment Duration: The time between processing the first and the last 
checkpoint barrier. For aligned checkpoints, during the alignment, the channels 
that have already received checkpoint barriers are blocked from processing more 
data.

This number is also defined the same way for the unaligned checkpoints. Even 
with unaligned checkpoints a subtask needs to wait for receiving all of the 
checkpoint barriers before completing the checkpoint. However, as subtask can 
broadcast the checkpoint barrier downstream immediately upon receiving the 
first checkpoint barrier AND those checkpoint barriers are able to overtake 
in-flight data, the propagation happens very very quickly for the most part. 
Hence alignment duration and start delay in this case should be very small, 
unless you have deeper problems like long GC pauses.
> If I’m understanding the aligned checkpoint mechanism correctly, after the 
> first failure the job restarts and tries to read, let’s say, the last 5 
> minutes of data. Then it fails again because the checkpoint times out and, 
> after restarting, would it try to read, for example, 15 minutes of data? If 
> there was no backpressure in the source, it could be that the new checkpoint 
> barriers created after the first restart are behind more data than before it 
> restarted, no?

I'm not sure if I understand. But yes. It's a valid scenario that:

1. timestamp t1, checkpoint 42 completes
2. failure happens at timestamp t1 + 10 minutes.
3. timestamp t2, job is recovered to checkpoint 42.
4. timestamp t2 + 5 minutes, checkpoint 43 is triggered.

Between 1. and 2., your job could have processed more records than between 3. 
and 4.

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/

pon., 25 paź 2021 o 15:02 Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 napisał(a):
Hi again,

Thanks a lot for taking the time to clarify this. I think that the main thing 
that is confusing me is that the UI shows Alignment Duration and other 
checkpoint metrics for each subtask, and the resources you’ve sent always 
discuss a single barrier per subtask channel. Should I understand these metrics 
as a property of an operator and not of each subtask (at least for aligned 
checkpoints)? Then “first” and “last” would make sense to me: first/last across 
all subtasks/channels for a given operator.

Naturally, for unaligned checkpoints, alignment duration isn’t applicable, but 
what about Start Delay? I imagine that might indeed be a property of the 
subtask and not the operator.

With respect to my problem, I can also add that my job reads data from Pulsar, 
so some of it is buffered in the message bus. If I’m understanding the aligned 
checkpoint mechanism correctly, after the first failure the job re

RE: Troubleshooting checkpoint timeout

2021-10-25 Thread Alexis Sarda-Espinosa
Hi again,

Thanks a lot for taking the time to clarify this. I think that the main thing 
that is confusing me is that the UI shows Alignment Duration and other 
checkpoint metrics for each subtask, and the resources you’ve sent always 
discuss a single barrier per subtask channel. Should I understand these metrics 
as a property of an operator and not of each subtask (at least for aligned 
checkpoints)? Then “first” and “last” would make sense to me: first/last across 
all subtasks/channels for a given operator.

Naturally, for unaligned checkpoints, alignment duration isn’t applicable, but 
what about Start Delay? I imagine that might indeed be a property of the 
subtask and not the operator.

With respect to my problem, I can also add that my job reads data from Pulsar, 
so some of it is buffered in the message bus. If I’m understanding the aligned 
checkpoint mechanism correctly, after the first failure the job restarts and 
tries to read, let’s say, the last 5 minutes of data. Then it fails again 
because the checkpoint times out and, after restarting, would it try to read, 
for example, 15 minutes of data? If there was no backpressure in the source, it 
could be that the new checkpoint barriers created after the first restart are 
behind more data than before it restarted, no?

Regards,
Alexis.

From: Piotr Nowojski 
Sent: Montag, 25. Oktober 2021 13:35
To: Alexis Sarda-Espinosa 
Cc: Parag Somani ; Caizhi Weng ; 
Flink ML 
Subject: Re: Troubleshooting checkpoint timeout

Hi again Alexis,

First answering your questions:
> 1. The documentation states “Operators that receive more than one input 
> stream need to align the input streams on the snapshot barriers”. If an 
> operator has parallelism > 1, does that count as more than one stream? Or is 
> there a single output barrier for all subtask outputs that gets “copied” to 
> all downstream subtask inputs?

Yes, in this context "more than one input" means more than one 
input/communication channels, for example from multiple upstream parallel 
operators. You can read about that in some blog posts [1] or presentations [2]

> 2. Similarly, alignment duration is said to be “The time between processing 
> the first and the last checkpoint barrier”. What exactly is the 
> interpretation of “first” and “last” here? Do they relate to a checkpoint “n” 
> where “first” would be the barrier for n-1 and “last” the one for n?

No. It means the difference between observing the first and last checkpoint 
barrier from the same checkpoint "n". If you are still not sure, I believe if 
you read/listen the resources I pointed out in the previous response (or google 
for "flink checkpointing") it should clear things out :)

> 3. Start delay also refers to the “first checkpoint barrier to reach this 
> subtask”. As before, what is “first” in this context?

Again, the same answer as above :) In short checkpoint barriers (for one given 
checkpoint "n"), are injected in the sources - in all parallel instances of the 
source operators. From there, they are traveling through the job graph to 
downstream operators. Once a downstream operator receives the first checkpoint 
barrier (from checkpoint "n"), the alignment phase begins. Note that if a 
downstream operator has parallelism of 100, it will most likely have 100 
network inputs, and it will be waiting for 100 different checkpoint barriers 
(one from each upstream operator) during the alignment phase. After receiving 
the 100th checkpoint barrier (still checkpoint "n"), the alignment phase 
completes. The difference between "aligned" and "unaligned" checkpoints comes 
down to when such operator broadcasts to it's outputs checkpoint barrier for 
the downstream operators. In aligned checkpoints such checkpoint barriers are 
broadcasted at the end of alignment phase. In unaligned checkpoints it happens 
after seeing the first checkpoint barrier.

> 4. Maybe this will be answered by the previous questions, but what happens to 
> barriers if a downstream operator has lower parallelism?

If there is a different number of input and output channels it doesn't matter. 
Once an operator decides to send/broadcast a checkpoint barrier downstream, it 
just broadcasts it to all output channels.

Coming back to your problem:

> There’s a good chance the problem begins when the job starts running out of 
> (heap) memory and the GC introduces delays. That’s of course independent of 
> Flink and I’ll have to look at the cause

This could easily explain an excessive back pressure. Note that if you have 
long GC pauses, it can affect accuracy of the metrics. Keep in mind that you 
can always attach a JVM profiler to the Flink's process to analyse easier 
what's happening there.

> I guess I was reading older versions of the documentation that didn’t have 
> that.

The documentation is evolving and we are t

RE: Watermark behavior when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Hi David,

If watermarks are ignored how do Consecutive windowed operations [1] work? I’m 
just trying to understand in which scenarios I need to assign timestamps and 
watermarks, or if it’s enough if I do it once near the beginning of the DAG 
(assuming the source doesn’t do it).

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#consecutive-windowed-operations

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 17:26
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Watermark behavior when connecting streams

Hi Alexis,

please take a look at AbstractStreamOperator [1] for details how the watermark 
is calculate for TwoInputOperator. It uses pretty much the same approach as for 
with the single input one (it simply takes a minimum).

For watermark re-assignment, we ignore input watermark unless it's 
Long.MAX_LONG (this happens on shutdown, eg. savepoint + drain). You can see 
more details by looking at the TimestampsAndWatermarksOperator [2].

[1] 
https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L608
[2] 
https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java#L124

Best,
D.

On Wed, Dec 1, 2021 at 9:49 AM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hi everyone,

Based on what I know, a single operator with parallelism > 1 checks the 
watermarks from all its streams and uses the smallest one out of the non-idle 
streams. My first question is whether watermarks are forwarded as long as a 
different watermark strategy is not applied downstream? For example, will my 
stream keep its watermarks even after windowing + processing?

The second question is what happens with watermarks after connecting 2 streams, 
specifically these 2 possibilities:

- One stream was watermarks but the other one doesn’t.
- Both streams have watermarks.

Regards,
Alexis.



RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
I do have some logic with timers today, but it’s indeed not ideal. I guess I’ll 
have a look at TwoInputStreamOperator, but I do have related questions. You 
mentioned a sample scenario of "processing backlog" where windows fire very 
quickly; could it happen that, in such a situation, the framework calls the 
operator’s processElement1 continuously (even for several minutes) before 
calling processElement2 a single time? How does the framework decide when to 
switch the stream processing when the streams are connected?

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 17:18
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

Even with the TwoInputStreamOperator you can not "halt" the processing. You 
need to buffer these elements for example in the ListState for later 
processing. At the time the watermark of the second stream arrives, you can 
process all buffered elements that satisfy the condition.

You could probably also implement a similar (less optimized) solution with 
KeyedCoProcessFunction using event time timers.

Best,
D.

On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t tell 
Flink to "halt" processElement1 and switch to the other stream depending on 
watermarks. I could look into TwoInputStreamOperator if you think that’s the 
best approach.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 16:59
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

I think this would require using lower level API and implementing a custom 
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}` methods.

Let's also make sure we're on the same page on what the watermark is. You can 
think of the watermark as event time clock. It basically gives you an 
information, that no more events with timestamp lower than the watermark should 
appear in your stream.

You simply delay emitting of the window result from your "connect" operator, 
until watermark from the second (side output) stream passes the window's max 
timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data for the same (windowed) minute reaches 
processElement1, even when considering watermarks.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 15:45
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

You can not rely on order of the two streams that easily. In case you are for 
example processing backlog and the windows fire quickly, it can happen that 
it's actually faster than the second branch which has less work to do. This 
will make the pipeline non-deterministic.

What you can do is to "synchronize" watermarks of both streams in your 
"connect" operator, but that of course involves buffering events in the state.

Best,
D.

On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-

RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t tell 
Flink to "halt" processElement1 and switch to the other stream depending on 
watermarks. I could look into TwoInputStreamOperator if you think that’s the 
best approach.

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 16:59
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

I think this would require using lower level API and implementing a custom 
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}` methods.

Let's also make sure we're on the same page on what the watermark is. You can 
think of the watermark as event time clock. It basically gives you an 
information, that no more events with timestamp lower than the watermark should 
appear in your stream.

You simply delay emitting of the window result from your "connect" operator, 
until watermark from the second (side output) stream passes the window's max 
timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data for the same (windowed) minute reaches 
processElement1, even when considering watermarks.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 15:45
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

You can not rely on order of the two streams that easily. In case you are for 
example processing backlog and the windows fire quickly, it can happen that 
it's actually faster than the second branch which has less work to do. This 
will make the pipeline non-deterministic.

What you can do is to "synchronize" watermarks of both streams in your 
"connect" operator, but that of course involves buffering events in the state.

Best,
D.

On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-task needs to send data via data exchange 
(last operator in chain) has an output buffer and the operator that consumes 
this data (maybe on different machine) has an input buffer (buffer de-bloating 
feature can help to mitigate excessive buffering in case of back-pressure).

but I’m not sure if this actually happens

How are you trying to verify this? Also can you check whether the operators are 
not back-pressured?

Best,
D.

On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFun

RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data for the same (windowed) minute reaches 
processElement1, even when considering watermarks.

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 15:45
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

You can not rely on order of the two streams that easily. In case you are for 
example processing backlog and the windows fire quickly, it can happen that 
it's actually faster than the second branch which has less work to do. This 
will make the pipeline non-deterministic.

What you can do is to "synchronize" watermarks of both streams in your 
"connect" operator, but that of course involves buffering events in the state.

Best,
D.

On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-task needs to send data via data exchange 
(last operator in chain) has an output buffer and the operator that consumes 
this data (maybe on different machine) has an input buffer (buffer de-bloating 
feature can help to mitigate excessive buffering in case of back-pressure).

but I’m not sure if this actually happens

How are you trying to verify this? Also can you check whether the operators are 
not back-pressured?

Best,
D.

On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFunction 
receives all valid data and should discard results from the main stream if 
states changed from "open" to something else before the window was evaluated.

I would have expected all data from the side output to be processed roughly 
immediately in KeyedCoProcessFunction’s processElement2 because there’s no 
windowing in the side stream, but I’m not sure if this actually happens, maybe 
the side stream (or both streams) buffers some data before passing it to the 
connected stream? If yes, is there any way I could tune this?

Regards,
Alexis.



Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFunction 
receives all valid data and should discard results from the main stream if 
states changed from "open" to something else before the window was evaluated.

I would have expected all data from the side output to be processed roughly 
immediately in KeyedCoProcessFunction's processElement2 because there's no 
windowing in the side stream, but I'm not sure if this actually happens, maybe 
the side stream (or both streams) buffers some data before passing it to the 
connected stream? If yes, is there any way I could tune this?

Regards,
Alexis.



RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-task needs to send data via data exchange 
(last operator in chain) has an output buffer and the operator that consumes 
this data (maybe on different machine) has an input buffer (buffer de-bloating 
feature can help to mitigate excessive buffering in case of back-pressure).

but I’m not sure if this actually happens

How are you trying to verify this? Also can you check whether the operators are 
not back-pressured?

Best,
D.

On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFunction 
receives all valid data and should discard results from the main stream if 
states changed from "open" to something else before the window was evaluated.

I would have expected all data from the side output to be processed roughly 
immediately in KeyedCoProcessFunction’s processElement2 because there’s no 
windowing in the side stream, but I’m not sure if this actually happens, maybe 
the side stream (or both streams) buffers some data before passing it to the 
connected stream? If yes, is there any way I could tune this?

Regards,
Alexis.



Re: OOM Metaspace after multiple jobs

2021-07-16 Thread Alexis Sarda-Espinosa
Since I'm running in a container, I was able to copy some of the jars to 
Flink's lib folder. When it comes to gRPC, I don't know if there's any other 
good option due to possible issues with ThreadLocals: 
https://github.com/grpc/grpc-java/issues/8309

Even then, I'm not sure that's a complete solution. I added a class (in the lib 
folder) that logs loaded/unloaded class counts with ClassLoadingMXBean, and 
even though the number of classes loaded increases more slowly with each job, 
it still increases. In a heapdump I took before moving jars to /lib, I could 
see multiple instances (one per job, it seems) of some of my job's classes 
(e.g. sources), and their GC roots were the Flink User Class Loader. I haven't 
figured out why they would remain across different jobs.

Regards,
Alexis.


From: Alexis Sarda-Espinosa 
Sent: Thursday, July 8, 2021 12:51 AM
To: user@flink.apache.org 
Subject: Re: OOM Metaspace after multiple jobs

I now see there have been problems with this in the past:

https://issues.apache.org/jira/browse/FLINK-16142
https://issues.apache.org/jira/browse/FLINK-19005

I actually use both JDBC and gRPC, so it seems this could indeed be an issue 
for me. Does anyone know if I can ensure my classes get cleaned up? In this 
scenario only my jobs would be running in the cluster, so I can have a bit more 
control.

Regards,
Alexis.


From: Alexis Sarda-Espinosa 
Sent: Thursday, July 8, 2021 12:14 AM
To: user@flink.apache.org 
Subject: OOM Metaspace after multiple jobs

Hello,

I am currently testing a scenario where I would run the same job multiple times 
in a loop with different inputs each time. I am testing with a local Flink 
cluster v1.12.4. I initially got an OOM - Metaspace error, so I increased the 
corresponding memory in the TM's JVM (to 512m), but it still fails sometimes.

I found this issue that talked about Python jobs: 
https://issues.apache.org/jira/browse/FLINK-20333, but there is a comment there 
saying that it would also affect Java jobs. The commit linked there seems to be 
concerned with Python only. Was this also fixed in 1.12.0 for Java?

Is there anything I could do to force a more thorough class loader cleanup 
after each call to execute() ?

Regards,
Alexis.




Re: Interval join operator is not forwarding watermarks correctly

2022-03-15 Thread Alexis Sarda-Espinosa
For completeness, this still happens with Flink 1.14.4

Regards,
Alexis.


From: Alexis Sarda-Espinosa 
Sent: Friday, March 11, 2022 12:21 AM
To: user@flink.apache.org 
Cc: pnowoj...@apache.org 
Subject: Re: Interval join operator is not forwarding watermarks correctly

I think I managed to create a reproducible example [1], I think it's due to the 
use of window + join + window. When I run the test, I never see the print 
output, but if I uncomment part of the code in the watermark generator to mark 
it as idle more quickly, it starts working after a while.

[1] https://github.com/asardaes/flink-interval-join-test


Regards,
Alexis.


From: Alexis Sarda-Espinosa 
Sent: Thursday, March 10, 2022 7:47 PM
To: user@flink.apache.org 
Cc: pnowoj...@apache.org 
Subject: RE: Interval join operator is not forwarding watermarks correctly


I found [1] and [2], which are closed, but could be related?



[1] https://issues.apache.org/jira/browse/FLINK-23698

[2] https://issues.apache.org/jira/browse/FLINK-18934



Regards,

Alexis.



From: Alexis Sarda-Espinosa 
Sent: Donnerstag, 10. März 2022 19:27
To: user@flink.apache.org
Subject: Interval join operator is not forwarding watermarks correctly



Hello,



I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the 
interval join in my pipeline is no longer working. More specifically, I have a 
sliding window after the interval join, and the window isn’t firing. After many 
tests, I ended up creating a custom operator that extends IntervalJoinOperator 
and I overrode processWatermark1() and processWatermark2() to add logs and 
check when they are called. I can see that processWatermark1() isn’t called.



For completeness, this is how I use my custom operator:



joinOperator = new CustomIntervalJoinOperator(…);



stream1.connect(stream2)

.keyBy(selector1, selector2)

.transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);



---



Some more information in case it’s relevant:



- stream2 is obtained from a side output.

- both stream1 and stream2 have watermarks assigned by custom strategies. I 
also log watermark creation, and I can see that watermarks are indeed emitted 
as expected in both streams.



Strangely, my watermark strategies mark themselves idle if they don’t receive 
new events after 10 minutes, and if I send some events and wait 10 minutes, 
processWatermark1() is called! On the other hand, if I continuously send 
events, it is never called.



Is this a known issue?



Regards,

Alexis.




Interval join operator is not forwarding watermarks correctly

2022-03-10 Thread Alexis Sarda-Espinosa
Hello,

I'm in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the 
interval join in my pipeline is no longer working. More specifically, I have a 
sliding window after the interval join, and the window isn't firing. After many 
tests, I ended up creating a custom operator that extends IntervalJoinOperator 
and I overrode processWatermark1() and processWatermark2() to add logs and 
check when they are called. I can see that processWatermark1() isn't called.

For completeness, this is how I use my custom operator:

joinOperator = new CustomIntervalJoinOperator(...);

stream1.connect(stream2)
.keyBy(selector1, selector2)
.transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);

---

Some more information in case it's relevant:

- stream2 is obtained from a side output.
- both stream1 and stream2 have watermarks assigned by custom strategies. I 
also log watermark creation, and I can see that watermarks are indeed emitted 
as expected in both streams.

Strangely, my watermark strategies mark themselves idle if they don't receive 
new events after 10 minutes, and if I send some events and wait 10 minutes, 
processWatermark1() is called! On the other hand, if I continuously send 
events, it is never called.

Is this a known issue?

Regards,
Alexis.



RE: Interval join operator is not forwarding watermarks correctly

2022-03-10 Thread Alexis Sarda-Espinosa
I found [1] and [2], which are closed, but could be related?

[1] https://issues.apache.org/jira/browse/FLINK-23698
[2] https://issues.apache.org/jira/browse/FLINK-18934

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
Sent: Donnerstag, 10. März 2022 19:27
To: user@flink.apache.org
Subject: Interval join operator is not forwarding watermarks correctly

Hello,

I'm in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the 
interval join in my pipeline is no longer working. More specifically, I have a 
sliding window after the interval join, and the window isn't firing. After many 
tests, I ended up creating a custom operator that extends IntervalJoinOperator 
and I overrode processWatermark1() and processWatermark2() to add logs and 
check when they are called. I can see that processWatermark1() isn't called.

For completeness, this is how I use my custom operator:

joinOperator = new CustomIntervalJoinOperator(...);

stream1.connect(stream2)
.keyBy(selector1, selector2)
.transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);

---

Some more information in case it's relevant:

- stream2 is obtained from a side output.
- both stream1 and stream2 have watermarks assigned by custom strategies. I 
also log watermark creation, and I can see that watermarks are indeed emitted 
as expected in both streams.

Strangely, my watermark strategies mark themselves idle if they don't receive 
new events after 10 minutes, and if I send some events and wait 10 minutes, 
processWatermark1() is called! On the other hand, if I continuously send 
events, it is never called.

Is this a known issue?

Regards,
Alexis.



Re: Interval join operator is not forwarding watermarks correctly

2022-03-10 Thread Alexis Sarda-Espinosa
I think I managed to create a reproducible example [1], I think it's due to the 
use of window + join + window. When I run the test, I never see the print 
output, but if I uncomment part of the code in the watermark generator to mark 
it as idle more quickly, it starts working after a while.

[1] https://github.com/asardaes/flink-interval-join-test


Regards,
Alexis.


From: Alexis Sarda-Espinosa 
Sent: Thursday, March 10, 2022 7:47 PM
To: user@flink.apache.org 
Cc: pnowoj...@apache.org 
Subject: RE: Interval join operator is not forwarding watermarks correctly


I found [1] and [2], which are closed, but could be related?



[1] https://issues.apache.org/jira/browse/FLINK-23698

[2] https://issues.apache.org/jira/browse/FLINK-18934



Regards,

Alexis.



From: Alexis Sarda-Espinosa 
Sent: Donnerstag, 10. März 2022 19:27
To: user@flink.apache.org
Subject: Interval join operator is not forwarding watermarks correctly



Hello,



I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the 
interval join in my pipeline is no longer working. More specifically, I have a 
sliding window after the interval join, and the window isn’t firing. After many 
tests, I ended up creating a custom operator that extends IntervalJoinOperator 
and I overrode processWatermark1() and processWatermark2() to add logs and 
check when they are called. I can see that processWatermark1() isn’t called.



For completeness, this is how I use my custom operator:



joinOperator = new CustomIntervalJoinOperator(…);



stream1.connect(stream2)

.keyBy(selector1, selector2)

.transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);



---



Some more information in case it’s relevant:



- stream2 is obtained from a side output.

- both stream1 and stream2 have watermarks assigned by custom strategies. I 
also log watermark creation, and I can see that watermarks are indeed emitted 
as expected in both streams.



Strangely, my watermark strategies mark themselves idle if they don’t receive 
new events after 10 minutes, and if I send some events and wait 10 minutes, 
processWatermark1() is called! On the other hand, if I continuously send 
events, it is never called.



Is this a known issue?



Regards,

Alexis.




Max parallelism and reactive mode

2022-03-03 Thread Alexis Sarda-Espinosa
Hi everyone,

I have some questions regarding max parallelism and how interacts with 
deployment modes. The documentation states that max parallelism should be "set 
on a per-job and per-operator granularity" but doesn't provide more details. Is 
it possible to have different values of max parallelism in different operators? 
I did a test in which my source had a max parallelism of 3, whereas a 
downstream operator had a (non-max) parallelism explicitly set to 4, and the 
job could not be started. Could this related to operator chaining? Or maybe the 
whole job ended up with a max parallelism of 3 because I didn't set it and it 
took the value from the source?

Additionally, the documentation states that, in reactive mode, only max 
parallelism is taken into account, so if I want to limit the number of parallel 
instances of my sources and sinks, I'd have to set their max parallelism, and 
that would be different from that of the rest of the operators.

Moreover, is it possible to switch a job from non-reactive to reactive mode via 
savepoints? What happens if my max parallelism settings change during the 
switch? For example, to limit my sink to a single instance.

In summary, for a hypothetical pipeline that basically does something like: 
source (parallelism between 1 & 3) -> stateful operator (parallelism between 1 
& 32) -> sink (parallelism exactly 1 always)
what should I do regarding max parallelism (both for execution env an 
operators) in normal mode, what should I do in reactive mode, and can I switch 
between modes with savepoints?

Regards,
Alexis.



RE: Determinism of interval joins

2022-01-29 Thread Alexis Sarda-Espinosa
I think I spoke to soon when I said my watermark strategies were like the 
included ones. My generators mark themselves as idle when they start, and stay 
like that as long as they don't seen any event at all. In the tests, I presume 
a variable number of events (and watermarks) from stream1 were consumed before 
anything from stream2 was, so by the time stream2 emitted a watermark to mark 
itself as not idle, it was already too late, and everything was dropped; I 
debugged some of the operators and could see that a lot of inputs were 
considered late since they were processed when the internal watermark service 
already had Long.MAX_VALUE as current watermark. If I change this idleness 
behavior, I do see changes in the test's output.

When running in real-time, I definitely need to mark some streams as idle after 
some time because I don't expect all of them to receive data constantly. 
However, the non-real-time scenario is also relevant for me, and not just for 
testing, if something crashes in the system and suddently the pipeline needs to 
process backlog, it would be nice if semantics were well defined. Ideally, this 
would mean, for two-input operators in general I imagine, that when an operator 
knows that all streams from one input have passed a certain watermark (based on 
slide/tumble time), it would switch and consume from the other stream to check 
whether it's idle or not. I suppose this wouldn't be a definite guarantee 
either since the data from the different streams may take some time to reach 
the different operators (latency and whatnot), but it would still be useful.

I imagine the details are more complex and I'm oversimplifying a bit (I don't 
know how the network stack works), but I would think this kind of semantics are 
commonly expected when handling multiple streams that need joins and so on. 
What do you think?

Regards,
Alexis.

From: Robert Metzger 
Sent: Freitag, 28. Januar 2022 14:49
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Determinism of interval joins

Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if the 
behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
I'm not sure if the issue in [1] is relevant since it mentions the Table API, 
but it could be. Since stream1 and stream2 in my example have a long chain of 
operators behind, I presume they might "run" at very different paces.

Oh and, in the context of my unit tests, watermarks should be deterministic, 
the input file is sorted, and the watermark strategies should essentially 
behave like the monotonous generator.

[1] https://issues.apache.org/jira/browse/FLINK-24466

Regards,
Alexis.

________
From: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Sent: Thursday, January 27, 2022 1:30 PM
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Determinism of interval joins


Hi everyone,



I’m seeing a lack of determinism in unit tests when using an interval join. I 
am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my 
pipeline look a bit like this:



keySelector1 = …

keySelector2 = …



rightStream = stream1

  .flatMap(…)

  .keyBy(keySelector1)

  .assignTimestampsAndWatermarks(strategy1)



leftStream = stream2

  .keyBy(keySelector2)

  .assignTimestampsAndWatermarks(strategy2)



joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, 
keySelector2)

  .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, 
keySelector1))

  .between(Time.minutes(-10L), Time.milliseconds(0L))

  .lowerBoundExclusive()

  .process(new IntervalJoinFunction(…))



---



In my tests, I have a bounded source that loads demo data from a file and 
simulates the stream with a sink that collects results in memory. In the 
specific case of my IntervalJoinFunction, I’m seeing that it’s called a 
different amount of times in a non-deterministic way, sometimes I see 14 calls 
to its processElement() method, others 8, others none at all and my output is 
empty; I count this by checking my logs with some tracing.



Does anyone know why this is? Maybe I’m doing something wrong, particularly 
with reinterpretAsKeyedStream.



Regards,

Alexis.




RE: Determinism of interval joins

2022-02-02 Thread Alexis Sarda-Espinosa
Well, for those who might be interested in the semantics I mentioned, I 
implemented a custom operator that seems to achieve what I want by mostly 
ignoring the actual timestamps from the side stream's watermarks. However, it 
kind of depends on the fact that my main stream comes from a previous window 
and is watermarked with "windowEnd - 1" (thus "timestamp1 + 1" below).

public class PrioritizedWatermarkStreamIntervalJoinOperator extends 
IntervalJoinOperator<...> {
private static final long serialVersionUID = 1L;

private long maxTimestamp1 = Long.MIN_VALUE;
private long maxTimestamp2 = Long.MIN_VALUE;

public PrioritizedWatermarkStreamIntervalJoinOperator(...) {
super(...);
}

@Override
public void processWatermark1(Watermark mark) throws Exception {
if (mark.getTimestamp() > maxTimestamp1) {
maxTimestamp1 = mark.getTimestamp();
}
super.processWatermark1(mark);
maybeProcessWatermark2(mark, mark.getTimestamp(), maxTimestamp2);
}

private void maybeProcessWatermark2(Watermark mark, long timestamp1, long 
maxTimestampForComparison) throws Exception {
if (mark.equals(Watermark.MAX_WATERMARK) && maxTimestampForComparison 
== Watermark.MAX_WATERMARK.getTimestamp()) {
super.processWatermark2(Watermark.MAX_WATERMARK);
} else if (maxTimestamp2 > maxTimestamp1) {
if (timestamp1 == Long.MAX_VALUE) {
LOG.warn("Trying to bump timestamp1 would result in overflow, 
skipping.");
return;
}
super.processWatermark2(new Watermark(timestamp1 + 1L));
}
}

@Override
public void processWatermark2(Watermark mark) throws Exception {
if (mark.getTimestamp() > maxTimestamp2) {
maxTimestamp2 = mark.getTimestamp();
}
maybeProcessWatermark2(mark, maxTimestamp1, maxTimestamp1);
}
}

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
Sent: Samstag, 29. Januar 2022 13:47
To: Robert Metzger 
Cc: user@flink.apache.org
Subject: RE: Determinism of interval joins

I think I spoke to soon when I said my watermark strategies were like the 
included ones. My generators mark themselves as idle when they start, and stay 
like that as long as they don't seen any event at all. In the tests, I presume 
a variable number of events (and watermarks) from stream1 were consumed before 
anything from stream2 was, so by the time stream2 emitted a watermark to mark 
itself as not idle, it was already too late, and everything was dropped; I 
debugged some of the operators and could see that a lot of inputs were 
considered late since they were processed when the internal watermark service 
already had Long.MAX_VALUE as current watermark. If I change this idleness 
behavior, I do see changes in the test's output.

When running in real-time, I definitely need to mark some streams as idle after 
some time because I don't expect all of them to receive data constantly. 
However, the non-real-time scenario is also relevant for me, and not just for 
testing, if something crashes in the system and suddently the pipeline needs to 
process backlog, it would be nice if semantics were well defined. Ideally, this 
would mean, for two-input operators in general I imagine, that when an operator 
knows that all streams from one input have passed a certain watermark (based on 
slide/tumble time), it would switch and consume from the other stream to check 
whether it's idle or not. I suppose this wouldn't be a definite guarantee 
either since the data from the different streams may take some time to reach 
the different operators (latency and whatnot), but it would still be useful.

I imagine the details are more complex and I'm oversimplifying a bit (I don't 
know how the network stack works), but I would think this kind of semantics are 
commonly expected when handling multiple streams that need joins and so on. 
What do you think?

Regards,
Alexis.

From: Robert Metzger mailto:metrob...@gmail.com>>
Sent: Freitag, 28. Januar 2022 14:49
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Determinism of interval joins

Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if the 
behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
I'm not sure if the issue in [1] is relevant since it mentions the Table API, 
but it could be. Since stream1 and stream2 in my example have a long chain of 
operators behind, I presume they might "run" at very different paces.

Oh and, in the context of my unit tests, watermarks should be deterministic, 
the input file is sorted, and the watermark strategies should essentially 
behave like the monoto

RE: Pojo State Migration - NPE with field deletion

2022-02-02 Thread Alexis Sarda-Espinosa
Hello,

Happened to me too, here’s the JIRA ticket: 
https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Alexis.

From: bastien dine 
Sent: Mittwoch, 2. Februar 2022 16:01
To: user 
Subject: Pojo State Migration - NPE with field deletion

Hello,

I have some trouble restoring a state (pojo) after deleting a field
According to documentation, it should not be a problem with POJO :
"Fields can be removed. Once removed, the previous value for the removed field 
will be dropped in future checkpoints and savepoints."

Here is a short stack trace (full trace is below) :

Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:119)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:184)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:46)


After some debug, it seems that the deleted POJO field still has a field 
serializer in the corresponding object PojoSerializer "fieldSerializers" array
But it is not present in the "fields", where we have a gap of 1 index (for 
example 0-1-3-4)
So when serializer reach index 2 we got this NPE,

Why is the deleted field serializer still present ? this should have been 
dropped when resolving schema compatibility right ?
I can not find anything on that matter, could someone help me with it ?
Reproduced in flink 1.13 & 1.14, can not find any related JIRA too

Best Regards,
Bastien

Full stack trace :
2022-02-02 15:44:20
java.io.IOException: Could not perform checkpoint 2737490 for 
operator OperatorXXX
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 2737490 for operator OperatorXXX
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)

Re: Determinism of interval joins

2022-01-27 Thread Alexis Sarda-Espinosa
I'm not sure if the issue in [1] is relevant since it mentions the Table API, 
but it could be. Since stream1 and stream2 in my example have a long chain of 
operators behind, I presume they might "run" at very different paces.

Oh and, in the context of my unit tests, watermarks should be deterministic, 
the input file is sorted, and the watermark strategies should essentially 
behave like the monotonous generator.

[1] https://issues.apache.org/jira/browse/FLINK-24466

Regards,
Alexis.

____
From: Alexis Sarda-Espinosa 
Sent: Thursday, January 27, 2022 1:30 PM
To: user@flink.apache.org 
Subject: Determinism of interval joins


Hi everyone,



I’m seeing a lack of determinism in unit tests when using an interval join. I 
am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my 
pipeline look a bit like this:



keySelector1 = …

keySelector2 = …



rightStream = stream1

  .flatMap(…)

  .keyBy(keySelector1)

  .assignTimestampsAndWatermarks(strategy1)



leftStream = stream2

  .keyBy(keySelector2)

  .assignTimestampsAndWatermarks(strategy2)



joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, 
keySelector2)

  .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, 
keySelector1))

  .between(Time.minutes(-10L), Time.milliseconds(0L))

  .lowerBoundExclusive()

  .process(new IntervalJoinFunction(…))



---



In my tests, I have a bounded source that loads demo data from a file and 
simulates the stream with a sink that collects results in memory. In the 
specific case of my IntervalJoinFunction, I’m seeing that it’s called a 
different amount of times in a non-deterministic way, sometimes I see 14 calls 
to its processElement() method, others 8, others none at all and my output is 
empty; I count this by checking my logs with some tracing.



Does anyone know why this is? Maybe I’m doing something wrong, particularly 
with reinterpretAsKeyedStream.



Regards,

Alexis.




Determinism of interval joins

2022-01-27 Thread Alexis Sarda-Espinosa
Hi everyone,

I'm seeing a lack of determinism in unit tests when using an interval join. I 
am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my 
pipeline look a bit like this:

keySelector1 = ...
keySelector2 = ...

rightStream = stream1
  .flatMap(...)
  .keyBy(keySelector1)
  .assignTimestampsAndWatermarks(strategy1)

leftStream = stream2
  .keyBy(keySelector2)
  .assignTimestampsAndWatermarks(strategy2)

joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, 
keySelector2)
  .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, 
keySelector1))
  .between(Time.minutes(-10L), Time.milliseconds(0L))
  .lowerBoundExclusive()
  .process(new IntervalJoinFunction(...))

---

In my tests, I have a bounded source that loads demo data from a file and 
simulates the stream with a sink that collects results in memory. In the 
specific case of my IntervalJoinFunction, I'm seeing that it's called a 
different amount of times in a non-deterministic way, sometimes I see 14 calls 
to its processElement() method, others 8, others none at all and my output is 
empty; I count this by checking my logs with some tracing.

Does anyone know why this is? Maybe I'm doing something wrong, particularly 
with reinterpretAsKeyedStream.

Regards,
Alexis.



RE: Buffering when connecting streams

2022-01-18 Thread Alexis Sarda-Espinosa
Hi again everyone,

It’s been a while, so first of all happy new year :)

I was revisiting this discussion and started looking at the code. However, it 
seems that all of the overloads of ConnectedStreams#process expect a 
CoProcessFunction or the Keyed counterpart, so I don’t think I can inject a 
custom TwoInputStreamOperator.

After a quick glance at the joining documentation, I wonder if I could 
accomplish what I want with a window/interval join of streams. If so, I might 
be able to avoid using state in the join function, but if I can’t avoid it, is 
it possible to use managed state in a (Process)JoinFunction? The join needs 
keys, but I don’t know if the resulting stream counts as keyed from the state’s 
point of view.

Regards,
Alexis.

From: Piotr Nowojski 
Sent: Montag, 6. Dezember 2021 08:43
To: David Morávek 
Cc: Alexis Sarda-Espinosa ; 
user@flink.apache.org
Subject: Re: Buffering when connecting streams

Hi Alexis and David,

This actually can not happen. There are mechanisms in the code to make sure 
none of the input is starved IF there is some data to be read.

The only time when input can be blocked is during the alignment phase of 
aligned checkpoints under back pressure. If there was a back pressure in your 
job it could have easily happened that checkpoint barriers would flow through 
the job graph to the CoProcessKeyedCoProcessFunction on one of the paths much 
quicker then the other, causing this faster path to be blocked until the other 
side catched up. But that would happen only during the alignment phase of the 
checkpoint, so without a backpressure for a very short period of time.

Piotrek

czw., 2 gru 2021 o 18:23 David Morávek 
mailto:d...@apache.org>> napisał(a):
I think this could happen, but I have a very limited knowledge about how the 
input gates work internally. @Piotr could definitely provide some more insight 
here.

D.

On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
I do have some logic with timers today, but it’s indeed not ideal. I guess I’ll 
have a look at TwoInputStreamOperator, but I do have related questions. You 
mentioned a sample scenario of "processing backlog" where windows fire very 
quickly; could it happen that, in such a situation, the framework calls the 
operator’s processElement1 continuously (even for several minutes) before 
calling processElement2 a single time? How does the framework decide when to 
switch the stream processing when the streams are connected?

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 17:18
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Even with the TwoInputStreamOperator you can not "halt" the processing. You 
need to buffer these elements for example in the ListState for later 
processing. At the time the watermark of the second stream arrives, you can 
process all buffered elements that satisfy the condition.

You could probably also implement a similar (less optimized) solution with 
KeyedCoProcessFunction using event time timers.

Best,
D.

On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t tell 
Flink to "halt" processElement1 and switch to the other stream depending on 
watermarks. I could look into TwoInputStreamOperator if you think that’s the 
best approach.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 16:59
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

I think this would require using lower level API and implementing a custom 
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}` methods.

Let's also make sure we're on the same page on what the watermark is. You can 
think of the watermark as event time clock. It basically gives you an 
information, that no more events with timestamp lower than the watermark should 
appear in your stream.

You simply delay emitting of the window result from your "connect" operator, 
until watermark from the second (side output) stream passes the window's max 
timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data

Re: Flink native k8s integration vs. operator

2022-01-20 Thread Alexis Sarda-Espinosa
Hi Robert,

I agree with you, I mean that's why I was writing a K8s operator, but the 
restriction wasn't decided by me, it was imposed on me. I guess my thinking was 
rather that an operator wouldn't necessarily supersede standalone+reactive, at 
least not in my case, but that certainly doesn't mean an operator is a bad 
idea, it's just something that other users might want to keep in mind.

Regards,
Alexis.


From: Robert Metzger 
Sent: Thursday, January 20, 2022 7:06 PM
To: Alexis Sarda-Espinosa 
Cc: dev ; user 
Subject: Re: Flink native k8s integration vs. operator

Hi Alexis,

The usage of Custom Resource Definitions (CRDs). The main reason given to me 
was that such resources are global (for a given cluster) and that is not 
desired. I know that ultimately a CR based on a CRD can be scoped to a specific 
namespace, but customer is king…

I don't think this restriction applies to many organizations. K8s operators are 
the de facto standard for deploying all kinds of software. There are quite many 
projects that used to just have a Helm chart, that are now switching over to 
provide operators, because they provide a much better experience.
If you have more specifics on this concern that is relevant for the Flink 
community, I'd like to hear that.


Kubernetes Service Accounts (SAs) with roles to create deployments/pods. This 
one is more understandable, particularly after the whole log4j debacle. Roles 
that manage solely deployment.scale subresources would be acceptable though.

This requirement is not strictly needed to deploy Flink on K8s. Only with the 
native K8s integration of Flink, you need to give the Flink JVM a role that 
allows creating other pods.


Best,
Robert

On Tue, Jan 18, 2022 at 5:18 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:

Hi everyone,



Since I see this is getting some traction, I’d like to add a couple things. I 
had been developing a Kubernetes controller for Flink as a Proof of Concept at 
my company; I called it Flork because it was to be a Flink Orchestrator for 
Kubernetes. In the end, we will most likely not use this controller due to 
security concerns that were communicated to me. These concerns stem from the 
fact that our product would be used by customers in their own Kubernetes 
clusters, and many customers don’t want:



- The usage of Custom Resource Definitions (CRDs). The main reason given to me 
was that such resources are global (for a given cluster) and that is not 
desired. I know that ultimately a CR based on a CRD can be scoped to a specific 
namespace, but customer is king…



- Kubernetes Service Accounts (SAs) with roles to create deployments/pods. This 
one is more understandable, particularly after the whole log4j debacle. Roles 
that manage solely deployment.scale subresources would be acceptable though.



I mention these in case they prove to be relevant for others in the current 
context. For us, it means we have to stick with something like standalone 
Kubernetes + reactive/adaptive.



Nevertheless, the PoC I had was already functional and, while I would have to 
request permission to contribute the code to the community, it might be useful 
for these efforts. However, I’d first ask if there is actually interest in this 
code, considering these are some of the “features” it currently has:



* The CRD relies on the Pod Template support included in Flink itself. As such, 
some of the fields in the CRD are “vanilla” pod specs, and the schema reflects 
that because it embeds a flattened version of the schema from [1]. I’d also 
have a basic Helm chart ready.



* The code is written in a mixture of Java and Kotlin, and is built with 
Gradle. I made heavy use of Kotlin Coroutines to implement some of the core 
logic in a non-blocking way.



* The code already supports High Availability by leveraging Kubernetes leases 
and the corresponding helpers in Fabric8’s client.



* The main deployment logic is delegated to Flink’s own flink-kubernetes module 
[2]. Nevertheless, my build shadows all the fabric8 classes and service 
definitions embedded in said module, so that the rest of the code can use other 
kubernetes-client versions independently.



* The controller handles savepoint creation for redeployments after CR changes, 
e.g. upgrades. This would also work after controller fail-over with/without HA.



* The code supports some extension for custom container images: classes defined 
in META-INF/services/ files are called as decorators for Flink’s conf file 
and/or the pod specs defined in the CR, and they could be copied to the image 
on top of an official base version.



* A deployment mode without CRD could be supported --- I have some code that 
can run on top of the core controller and allows “embedding” a CR in a Config 
Map key. The translation between the CM and the core controller code is then 
done transparently.



* I have a module that integrates th

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-12 Thread Alexis Sarda-Espinosa
Thanks for all the pointers. The UI does show combined state for a chain, but 
the only state descriptors inside that chain are the 3 I mentioned before. Its 
size is still increasing today, and duration is now around 30 seconds (I can't 
use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but 
I don't see anything weird. The counts go up and down depending on which one I 
load, but even the bigger ones have around 500-700 entries, which should only 
be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
windowedStream  = timestampedStream -> reinterpretAsKeyedStream -> 
window (SlidingEventTimeWindows)
windowedStream -> process1 -> sink1
windowedStream -> process2 -> sink2
windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing 
correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? 
I assume the window operators save some information in the state as well.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

/shared folder contains keyed state that is shared among different checkpoints 
[1]. Most of state should be shared in your case since you're using keyed state 
and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is 
loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) 
and compare the numbers of objects in their states. To prevent the job from 
discarding the state, it can either be stopped for some time and then restarted 
from the latest checkpoint; or the number of retained checkpoints can be 
increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but 
not the sizes of individual states). If the operators are chained then their 
combined state size will be shown. To prevent this, you can disable chaining 
[3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint 
is subsumed). However, this is not guaranteed: if there is any problem during 
deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
 wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> You suggest comparing counts of objects in different checkpoints, I assume 
> you mean copying my "checkpoints" folder at different times and comparing, 
> not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did 
> look at the folder in the running system, and I noticed that most of the 
> chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> corresponding to newer checkpoints. I would think this makes sense since the 
> configuration specifies that only 1 completed checkpoint should be retained, 
> but then why are the older chk-* folders still there? I did trigger a manual 
> restart of the Flink cluster in the past (before starting the long-running 
> test), but if my policy is to CLAIM the checkpoint, Flink's documentation 
> states that it would be cleaned eventually.
>
> Moreover, just by looking at folder sizes with "du", I can see that most of 
> the state is held in the "shared" folder, and that has grown for sure; I'm 
> not sure what "shared" usually holds, but if that's what's growing, maybe I 
> can rule out expired state staying around?. My pipeline doesn't use timers, 
> although I guess Flink itself may use them. Is there any way I could get some 
> insight into which operator holds larger states?
>
> Regards,
> Alexis.
>
> -Original Message-
> From: Roman Khachatryan 
> Sent: Dienstag, 12. April 2022 12:37
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> Hi Alexis,
>
> Thanks a lot for sharing this. I think the program is correct.
> Although it doesn'

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-12 Thread Alexis Sarda-Espinosa
Hi Roman,

Maybe I'm misunderstanding the structure of the data within the checkpoint. You 
suggest comparing counts of objects in different checkpoints, I assume you mean 
copying my "checkpoints" folder at different times and comparing, not comparing 
different "chk-*" folders in the same snapshot, right?

I haven't executed the processor program with a newer checkpoint, but I did 
look at the folder in the running system, and I noticed that most of the chk-* 
folders have remained unchanged, there's only 1 or 2 new folders corresponding 
to newer checkpoints. I would think this makes sense since the configuration 
specifies that only 1 completed checkpoint should be retained, but then why are 
the older chk-* folders still there? I did trigger a manual restart of the 
Flink cluster in the past (before starting the long-running test), but if my 
policy is to CLAIM the checkpoint, Flink's documentation states that it would 
be cleaned eventually.

Moreover, just by looking at folder sizes with "du", I can see that most of the 
state is held in the "shared" folder, and that has grown for sure; I'm not sure 
what "shared" usually holds, but if that's what's growing, maybe I can rule out 
expired state staying around?. My pipeline doesn't use timers, although I guess 
Flink itself may use them. Is there any way I could get some insight into which 
operator holds larger states?

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Dienstag, 12. April 2022 12:37
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

Thanks a lot for sharing this. I think the program is correct.
Although it doesn't take timers into account; and to estimate the state size 
more accurately, you could also use the same serializers used by the job.
But maybe it makes more sense to compare the counts of objects in different 
checkpoints and see which state is growing.

If the number of keys is small, compaction should eventually clean up the old 
values, given that the windows eventually expire. I think it makes sense to 
check that watermarks in all windows are making progress.

Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the results of 
the State Processor program.

Regards,
Roman

On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa 
 wrote:
>
> Some additional information that I’ve gathered:
>
>
>
> The number of unique keys in the system is 10, and that is correctly 
> reflected in the state.
> TTL for global window state is set to update on read and write, but the code 
> has logic to remove old state based on event time.
> Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> GitHub gist with the whole processor setup since it’s not too long: 
> https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
>
>
>
> Relevant configuration entries (explicitly set, others are left with 
> defaults):
>
>
>
> state.backend: rocksdb
>
> state.backend.incremental: true
>
> execution.checkpointing.interval: 30 s
>
> execution.checkpointing.min-pause: 25 s
>
> execution.checkpointing.timeout: 5 min
>
> execution.savepoint-restore-mode: CLAIM
>
> execution.checkpointing.externalized-checkpoint-retention: 
> RETAIN_ON_CANCELLATION
>
>
>
> Over the weekend, state size has grown to 1.23GB with the operators 
> referenced in the processor program taking 849MB, so I’m still pretty 
> puzzled. I thought it could be due to expired state being retained, but I 
> think that doesn’t make sense if I have finite keys, right?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> From: Alexis Sarda-Espinosa 
> Sent: Samstag, 9. April 2022 01:39
> To: ro...@apache.org
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
>
>
> Hi Roman,
>
>
>
> Here's an example of a WindowReaderFunction:
>
>
>
> public class StateReaderFunction extends 
> WindowReaderFunction {
>
> private static final ListStateDescriptor LSD = new 
> ListStateDescriptor<>(
>
> "descriptorId",
>
> Integer.class
>
> );
>
>
>
> @Override
>
> public void readWindow(String s, Context context, 
> Iterable elements, Collector out) throws Exception {
>
> int count = 0;
>
> for (Integer i : 
> context.windowState().getListState(LSD).get()) {
>
> count++;
>
> }
>
> out.collect(count);
>
> }
>
> }
>
>
>
> That's for the operator that uses window state. 

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-11 Thread Alexis Sarda-Espinosa
Some additional information that I've gathered:


  *   The number of unique keys in the system is 10, and that is correctly 
reflected in the state.
  *   TTL for global window state is set to update on read and write, but the 
code has logic to remove old state based on event time.
  *   Not sure it's relevant, but the Flink cluster does run with jemalloc 
enabled.
  *   GitHub gist with the whole processor setup since it's not too long: 
https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678

Relevant configuration entries (explicitly set, others are left with defaults):

state.backend: rocksdb
state.backend.incremental: true
execution.checkpointing.interval: 30 s
execution.checkpointing.min-pause: 25 s
execution.checkpointing.timeout: 5 min
execution.savepoint-restore-mode: CLAIM
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION

Over the weekend, state size has grown to 1.23GB with the operators referenced 
in the processor program taking 849MB, so I'm still pretty puzzled. I thought 
it could be due to expired state being retained, but I think that doesn't make 
sense if I have finite keys, right?

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
Sent: Samstag, 9. April 2022 01:39
To: ro...@apache.org
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Roman,

Here's an example of a WindowReaderFunction:

public class StateReaderFunction extends WindowReaderFunction {
private static final ListStateDescriptor LSD = new 
ListStateDescriptor<>(
"descriptorId",
Integer.class
);

@Override
public void readWindow(String s, Context context, 
Iterable elements, Collector out) throws Exception {
int count = 0;
for (Integer i : context.windowState().getListState(LSD).get()) {
count++;
}
out.collect(count);
}
}

That's for the operator that uses window state. The other readers do something 
similar but with context.globalState(). That should provide the number of state 
entries for each key+window combination, no? And after collecting all results, 
I would get the number of state entries across all keys+windows for an operator.

And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
context.windowState().getListState(...).clear().


Side note: in the state processor program I call 
ExecutionEnvironment#setParallelism(1) even though my streaming job runs with 
parallelism=4, this doesn't affect the result, does it?

Regards,
Alexis.


From: Roman Khachatryan mailto:ro...@apache.org>>
Sent: Friday, April 8, 2022 11:06 PM
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I've been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job's setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState and no TTL.
> Global window state with MapState> and a TTL o

Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Alexis Sarda-Espinosa
Hi Roman,

Here's an example of a WindowReaderFunction:

public class StateReaderFunction extends WindowReaderFunction {
private static final ListStateDescriptor LSD = new 
ListStateDescriptor<>(
"descriptorId",
Integer.class
);

@Override
public void readWindow(String s, Context context, 
Iterable elements, Collector out) throws Exception {
int count = 0;
for (Integer i : context.windowState().getListState(LSD).get()) {
count++;
}
out.collect(count);
}
}

That's for the operator that uses window state. The other readers do something 
similar but with context.globalState(). That should provide the number of state 
entries for each key+window combination, no? And after collecting all results, 
I would get the number of state entries across all keys+windows for an operator.

And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
context.windowState().getListState(...).clear().

Side note: in the state processor program I call 
ExecutionEnvironment#setParallelism(1) even though my streaming job runs with 
parallelism=4, this doesn't affect the result, does it?

Regards,
Alexis.


From: Roman Khachatryan 
Sent: Friday, April 8, 2022 11:06 PM
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

If I understand correctly, the provided StateProcessor program gives
you the number of stream elements per operator. However, you mentioned
that these operators have collection-type states (ListState and
MapState). That means that per one entry there can be an arbitrary
number of state elements.

Have you tried estimating the state sizes directly via readKeyedState[1]?

> The other operator does override and call clear()
Just to make sure, you mean ProcessWindowFunction.clear() [2], right?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-

Regards,
Roman


On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
 wrote:
>
> Hello,
>
>
>
> I have a streaming job running on Flink 1.14.4 that uses managed state with 
> RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev 
> environment that has been running for the last week and I noticed that state 
> size and end-to-end duration have been increasing steadily. Currently, 
> duration is 11 seconds and size is 917MB (as shown in the UI). The tasks with 
> the largest state (614MB) come from keyed sliding windows. Some attributes of 
> this job’s setup:
>
>
>
> Windows are 11 minutes in size.
> Slide time is 1 minute.
> Throughput is approximately 20 events per minute.
>
>
>
> I have 3 operators with these states:
>
>
>
> Window state with ListState and no TTL.
> Global window state with MapState> and a TTL of 1 hour 
> (with cleanupInRocksdbCompactFilter(1000L)).
> Global window state with ListState where the Pojo has an int and a 
> long, a TTL of 1 hour, and configured with 
> cleanupInRocksdbCompactFilter(1000L) as well.
>
>
>
> Both operators with global window state have logic to manually remove old 
> state in addition to configured TTL. The other operator does override and 
> call clear().
>
>
>
> I have now analyzed the checkpoint folder with the state processor API, and 
> I’ll note here that I see 50 folders named chk-*** even though I don’t set 
> state.checkpoints.num-retained and the default should be 1. I loaded the data 
> from the folder with the highest chk number and I see that my operators have 
> these amounts respectively:
>
>
>
> 10 entries
> 80 entries
> 200 entries
>
>
>
> I got those numbers with something like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(...)
>
> .collect()
>
> .parallelStream()
>
> .reduce(0, Integer::sum);
>
>
>
> Where my WindowReaderFunction classes just count the number of entries in 
> each call to readWindow.
>
>
>
> Those amounts cannot possibly account for 614MB, so what am I missing?
>
>
>
> Regards,
>
> Alexis.
>
>


RE: Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Alexis Sarda-Espinosa
Hi Roman,

Thanks for the quick response. It wasn't that, but your comment about erasure 
made me realize I should have debugged the code and looked at the types. 
Apparently setting TTL changes the serializer, so I also had to add TTL in the 
WindowReaderFunction.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint

Hi Alexis,

I think your setup is fine, but probably Java type erasure makes Flink consider 
the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers 
(constructed manually)?

Regards,
Roman


On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa 
 wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses 
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with 
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this 
> in readWindow:
>
>
>
> MapState> mapState = 
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not 
> be incompatible with the old state serializer 
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>


Using state processor API to read state defined with a TypeHint

2022-04-08 Thread Alexis Sarda-Espinosa
Hi everyone,

I have a ProcessWindowFunction that uses Global window state. It uses MapState 
with a descriptor defined like this:

MapStateDescriptor> msd = new MapStateDescriptor<>(
"descriptorName",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint>() {})
);

Now I'm trying to access a checkpoint's state data to read that (created with 
RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction that defines the same descriptor and calls this in 
readWindow:

MapState> mapState = context.globalState().getMapState(msd);

After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to 
configure the reader function like this:

savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(
"my-uid",
new StateReaderFunction(),
Types.STRING,
TypeInformation.of(MyPojo.class),
Types.INT
)
.print();

But I am getting this exception:

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) 
must not be incompatible with the old state serializer 
(org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).

Does someone know what I'm doing wrong in my setup?

Regards,
Alexis.



RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-14 Thread Alexis Sarda-Espinosa
Hello,

There was a network issue in my environment and the job had to restart. After 
the job came back up, the logs showed a lot of lines like this:

RocksDBIncrementalRestoreOperation ... Starting to restore from state handle: 
...

Interestingly, those entries include information about sizes in bytes:

- 
445163.sst=ByteStreamStateHandle{handleName='file:/opt/flink/state/checkpoints//shared/18f95afa-dc66-467d-bd05-779895f24960',
 dataBytes=1328}
- privateState={MANIFEST-04=File State: 
file:/opt/flink/state/checkpoints//shared/bd7fde24-3ef6-4e05-bbd6-1474f8051d5d
 [80921331 bytes]

I extracted a lot of that information and I can see that:

- If I sum all dataBytes from sharedState, that only accounts for a couple MB.
- Most of the state comes from privateState, specifically from the entries 
referring to MANIFEST File State; that accounts for almost 1.5GB.

I believe that is one of the files RocksDB uses internally, but is that related 
to managed state used by my functions? Or does that indicate size growth is 
elsewhere?

Regards,
Alexis.

-Original Message-
From: Alexis Sarda-Espinosa  
Sent: Dienstag, 12. April 2022 15:39
To: ro...@apache.org
Cc: user@flink.apache.org
Subject: RE: RocksDB's state size discrepancy with what's seen with state 
processor API

Thanks for all the pointers. The UI does show combined state for a chain, but 
the only state descriptors inside that chain are the 3 I mentioned before. Its 
size is still increasing today, and duration is now around 30 seconds (I can't 
use unaligned checkpoints because I use partitionCustom).

I've executed the state processor program for all of the 50 chk-* folders, but 
I don't see anything weird. The counts go up and down depending on which one I 
load, but even the bigger ones have around 500-700 entries, which should only 
be a couple hundred KB; it's not growing monotonically.

The chain of operators is relatively simple:

timestampedStream = inputStream -> keyBy -> assignTimestampsAndWatermarks
windowedStream  = timestampedStream -> reinterpretAsKeyedStream -> 
window (SlidingEventTimeWindows)
windowedStream -> process1 -> sink1
windowedStream -> process2 -> sink2
windowedStream -> process3 -> map

And according to the Low Watermark I see in the UI, event time is advancing 
correctly.

Could you confirm if Flink's own operators could be creating state in RocksDB? 
I assume the window operators save some information in the state as well.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan 
Sent: Dienstag, 12. April 2022 14:06
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

/shared folder contains keyed state that is shared among different checkpoints 
[1]. Most of state should be shared in your case since you're using keyed state 
and incremental checkpoints.

When a checkpoint is loaded, the state that it shares with older checkpoints is 
loaded as well. I suggested to load different checkpoints (i.e. chk-* folders) 
and compare the numbers of objects in their states. To prevent the job from 
discarding the state, it can either be stopped for some time and then restarted 
from the latest checkpoint; or the number of retained checkpoints can be 
increased [2]. Copying isn't necessary.

Besides that, you can also check state sizes of operator in Flink Web UI (but 
not the sizes of individual states). If the operators are chained then their 
combined state size will be shown. To prevent this, you can disable chaining 
[3] (although this will have performance impact).

Individual checkpoint folders should be eventually removed (when the checkpoint 
is subsumed). However, this is not guaranteed: if there is any problem during 
deletion, it will be logged, but the job will not fail.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining

Regards,
Roman

On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
 wrote:
>
> Hi Roman,
>
> Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> You suggest comparing counts of objects in different checkpoints, I assume 
> you mean copying my "checkpoints" folder at different times and comparing, 
> not comparing different "chk-*" folders in the same snapshot, right?
>
> I haven't executed the processor program with a newer checkpoint, but I did 
> look at the folder in the running system, and I noticed that most of the 
> chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> corresp

Semantics of purging with global windows

2023-08-30 Thread Alexis Sarda-Espinosa
Hello,

According to the javadoc of TriggerResult.PURGE, "All elements in the
window are cleared and the window is discarded, without evaluating
the window function or emitting any elements."
However, I've noticed that using a GlobalWindow (with a custom trigger)
followed by an AggregateFunction will call the function's add() even when
the trigger result is PURGE.

It seems to me that this has been the behavior for a very long time:

https://github.com/apache/flink/commit/6cd8ceb10c841827cf89b74ecf5a0495a6933d53#diff-6d18531a35cddca6e5995c40c7a564fd711b998d567c4e167a401f76ca29a2bbR295-R299

Is that really necessary? I'm guessing that operator deals with all types
of windows, so I'm not sure how that affects other window types.

Regards,
Alexis.


Re: Failure to restore from last completed checkpoint

2023-09-08 Thread Alexis Sarda-Espinosa
Hello,

Just a shot in the dark here, but could it be related to
https://issues.apache.org/jira/browse/FLINK-32241 ?

Such failures can cause many exceptions, but I think the ones you've
included aren't pointing to the root cause, so I'm not sure if that issue
applies to you.

Regards,
Alexis.

On Fri, 8 Sept 2023, 20:43 Jacqlyn Bender via user, 
wrote:

> Hi Yanfei,
>
> We were never able to restore from a checkpoint, we ended up restoring
> from a savepoint as fallback. Would those logs suggest we failed to take a
> checkpoint before the job manager restarted? Our observabillity monitors
> showed no failed checkpoints.
>
> Here is an exception that occurred before the failure to restore from the
> checkpoint:
>
> java.io.IOException: Cannot register Closeable, registry is already
> closed. Closing argument.
>
> at
> org.apache.flink.util.AbstractAutoCloseableRegistry.registerCloseable(AbstractAutoCloseableRegistry.java:89)
> ~[a-pipeline-name.jar:1.0]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:128)
> ~[flink-dist-1.17.1-shopify-81a88f8.jar:1.17.1-shopify-81a88f8]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist-1.17.1-shopify-81a88f8.jar:1.17.1-shopify-81a88f8]
>
> at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[a-pipeline-name.jar:1.0]
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Thanks,
> Jacqlyn
>
> On Thu, Sep 7, 2023 at 7:42 PM Yanfei Lei  wrote:
>
>> Hey Jacqlyn,
>> According to the stack trace, it seems that there is a problem when
>> the checkpoint is triggered. Is this the problem after the restore?
>> would you like to share some logs related to restoring?
>>
>> Best,
>> Yanfei
>>
>> Jacqlyn Bender via user  于2023年9月8日周五 05:11写道:
>> >
>> > Hey folks,
>> >
>> >
>> > We experienced a pipeline failure where our job manager restarted and
>> we were for some reason unable to restore from our last successful
>> checkpoint. We had regularly completed checkpoints every 10 minutes up to
>> this failure and 0 failed checkpoints logged. Using Flink version 1.17.1.
>> >
>> >
>> > Wondering if anyone can shed light on what might have happened?
>> >
>> >
>> > Here's the error from our logs:
>> >
>> >
>> > Message: FATAL: Thread ‘Checkpoint Timer’ produced an uncaught
>> exception. Stopping the process...
>> >
>> >
>> > extendedStackTrace: java.util.concurrent.CompletionException:
>> java.util.concurrent.CompletionException: java.lang.NullPointerException
>> >
>> > at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$8(CheckpointCoordinator.java:669)
>> ~[a-pipeline-name.jar:1.0]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:910)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> [?:?]
>> >
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>> >
>> > at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>> >
>> > at java.lang.Thread.run(Thread.java:829) [?:?]
>> >
>> > Caused by: java.util.concurrent.CompletionException:
>> java.lang.NullPointerException
>> >
>> > at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>> ~[?:?]
>> >
>> > ... 7 more
>> >
>> > Caused by: java.lang.NullPointerException
>> >
>> > at
>> 

Re: Updating existing state with state processor API

2023-10-27 Thread Alexis Sarda-Espinosa
Hi Matthias,

Thanks for the response. I guess the specific question would be, if I work
with an existing savepoint and pass an empty DataStream to
OperatorTransformation#bootstrapWith, will the new savepoint end up with an
empty state for the modified operator, or will it maintain the existing
state because nothing was changed?

Regards,
Alexis.

Am Fr., 27. Okt. 2023 um 08:40 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Good morning Alexis,
>
>
>
> Something like this we do all the time.
>
> Read and existing savepoint, copy some of the not to be changed operator
> states (keyed/non-keyed) over, and process/patch the remaining ones by
> transforming and bootstrapping to new state.
>
>
>
> I could spare more details for more specific questions, if you like 
>
>
>
> Regards
>
>
>
> Thias
>
>
>
> PS: I’m currently working on this ticket in order to get some glitches
> removed: FLINK-26585 <https://issues.apache.org/jira/browse/FLINK-26585>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Thursday, October 26, 2023 4:01 PM
> *To:* user 
> *Subject:* Updating existing state with state processor API
>
>
>
> Hello,
>
>
>
> The documentation of the state processor API has some examples to modify
> an existing savepoint by defining a StateBootstrapTransformation. In all
> cases, the entrypoint is OperatorTransformation#bootstrapWith, which
> expects a DataStream. If I pass an empty DataStream to bootstrapWith and
> then apply the resulting transformation to an existing savepoint, will the
> transformation still receive data from the existing state?
>
>
>
> If the aforementioned is incorrect, I imagine I could instantiate
> a SavepointReader and create a DataStream of the existing state with it,
> which I could then pass to the bootstrapWith method directly or after
> "unioning" it with additional state. Would this work?
>
>
>
> Regards,
>
> Alexis.
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Updating existing state with state processor API

2023-10-26 Thread Alexis Sarda-Espinosa
Hello,

The documentation of the state processor API has some examples to modify an
existing savepoint by defining a StateBootstrapTransformation. In all
cases, the entrypoint is OperatorTransformation#bootstrapWith, which
expects a DataStream. If I pass an empty DataStream to bootstrapWith and
then apply the resulting transformation to an existing savepoint, will the
transformation still receive data from the existing state?

If the aforementioned is incorrect, I imagine I could instantiate
a SavepointReader and create a DataStream of the existing state with it,
which I could then pass to the bootstrapWith method directly or after
"unioning" it with additional state. Would this work?

Regards,
Alexis.


Side outputs documentation

2023-09-22 Thread Alexis Sarda-Espinosa
Hello,

very quick question, the documentation for side outputs states that an
OutputTag "needs to be an anonymous inner class, so that we can analyze the
type" (this is written in a comment in the example). Is this really true?
I've seen many examples where it's a static element and it seems to work
fine.

Regards,
Alexis.


Re: Side outputs documentation

2023-09-25 Thread Alexis Sarda-Espinosa
Hi Yunfeng,

Thanks for the response. I hadn't even seen the other constructor, but it
seems that the single-arg constructor works fine even if the output tag is
declared as "static final", at least in my use case. I imagine Flink would
complain about unknown types if it really can't figure it out
automatically, so maybe I can just let it be as long as tests pass, but I
was wondering if Flink really needs a non-static field to analyze type
information here. Who knows, maybe there are some scenarios where it's
really a must.

Regards,
Alexis.

Am Mo., 25. Sept. 2023 um 05:17 Uhr schrieb Yunfeng Zhou <
flink.zhouyunf...@gmail.com>:

> Hi Alexis,
>
> If you create OutputTag with the constructor `OutputTag(String id)`,
> you need to make it anonymous for Flink to analyze the type
> information. But if you use the constructor `OutputTag(String id,
> TypeInformation typeInfo)`, you need not make it anonymous as you
> have provided the type information.
>
> The second constructor is introduced after the document and the first
> constructor, and I think the document might have been outdated and not
> match with OutputTag's current behavior. A ticket and PR could be
> added to fix the document. What do you think?
>
> Best,
> Yunfeng
>
> On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
>  wrote:
> >
> > Hello,
> >
> > very quick question, the documentation for side outputs states that an
> OutputTag "needs to be an anonymous inner class, so that we can analyze the
> type" (this is written in a comment in the example). Is this really true?
> I've seen many examples where it's a static element and it seems to work
> fine.
> >
> > Regards,
> > Alexis.
> >
>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Surendra,

there are no exceptions in the logs, nor anything salient with
INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
set the config

execution.checkpointing.tolerable-failed-checkpoints: 1

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
surendralilh...@gmail.com>:

> Hi Alexis,
>
> Could you please check the TaskManager log for any exceptions?
>
> Thanks
> Surendra
>
>
> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Ram,

Thanks for that. We configure a path with ABFSS scheme in the following
settings:

- state.checkpoints.dir
- state.savepoints.dir
- high-availability.storageDir

We use RocksDB with incremental checkpointing every minute.

I found the metrics from Azure in the storage account under Monitoring,
Insights, Failures, scrolling down. I'll attach a screenshot here, although
I'm not sure that works well with the distribution list.

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 07:28 Uhr schrieb ramkrishna vasudevan <
ramvasu.fl...@gmail.com>:

> Can you help with more info here?
> The RocksDB backend itself is in ABFS instead of local? Or you mean the
> checkpoint is in ABFS but local dir for RocksDB is in local storage?
>
> GetPathSTatus is done by your monitoring pages? We run Flink on ABFS so we
> would like to see if we can help you out.
>
> Regards
> Ram
>
> On Thu, Sep 28, 2023 at 2:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Side outputs documentation

2023-09-26 Thread Alexis Sarda-Espinosa
I see, sounds good, thanks for the clarification.

Am Di., 26. Sept. 2023 um 03:29 Uhr schrieb Yunfeng Zhou <
flink.zhouyunf...@gmail.com>:

> Hi Alexis,
>
> Thanks for the clarification. I found the second constructor on
> Flink's master branch here[1], and maybe it was that we had been
> commenting on different versions of Flink, and the second constructor
> has not been introduced in the version you use. From the source code I
> can see that the OutputTag need not be anonymous so long as the type
> extraction process passes, while making it anonymous guarantees the
> success of this step, so you are right that you need not bother about
> this matter so long as your tests and jobs can pass. Besides, I wonder
> whether being a static field influences the anonymity of a variable.
> To my understanding, making it anonymous means coding `new
> OutputTag("foobar"){}` instead of  `new
> OutputTag("foobar")`. It doesn't matter whether the prefix is
> `private OutputTag tag = new OutputTag("foobar"){}` or
> `private static OutputTag tag = new
> OutputTag("foobar"){}`. They should be independent from each
> other and OutputTag's document is correct from this aspect.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/OutputTag.java#L82
>
> Best,
> Yunfeng
>
> On Mon, Sep 25, 2023 at 10:57 PM Alexis Sarda-Espinosa
>  wrote:
> >
> > Hi Yunfeng,
> >
> > Thanks for the response. I hadn't even seen the other constructor, but
> it seems that the single-arg constructor works fine even if the output tag
> is declared as "static final", at least in my use case. I imagine Flink
> would complain about unknown types if it really can't figure it out
> automatically, so maybe I can just let it be as long as tests pass, but I
> was wondering if Flink really needs a non-static field to analyze type
> information here. Who knows, maybe there are some scenarios where it's
> really a must.
> >
> > Regards,
> > Alexis.
> >
> > Am Mo., 25. Sept. 2023 um 05:17 Uhr schrieb Yunfeng Zhou <
> flink.zhouyunf...@gmail.com>:
> >>
> >> Hi Alexis,
> >>
> >> If you create OutputTag with the constructor `OutputTag(String id)`,
> >> you need to make it anonymous for Flink to analyze the type
> >> information. But if you use the constructor `OutputTag(String id,
> >> TypeInformation typeInfo)`, you need not make it anonymous as you
> >> have provided the type information.
> >>
> >> The second constructor is introduced after the document and the first
> >> constructor, and I think the document might have been outdated and not
> >> match with OutputTag's current behavior. A ticket and PR could be
> >> added to fix the document. What do you think?
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
> >>  wrote:
> >> >
> >> > Hello,
> >> >
> >> > very quick question, the documentation for side outputs states that
> an OutputTag "needs to be an anonymous inner class, so that we can analyze
> the type" (this is written in a comment in the example). Is this really
> true? I've seen many examples where it's a static element and it seems to
> work fine.
> >> >
> >> > Regards,
> >> > Alexis.
> >> >
>


Continuous errors with Azure ABFSS

2023-09-27 Thread Alexis Sarda-Espinosa
Hello,

We are using ABFSS for RocksDB's backend as well as the storage dir
required for Kubernetes HA. In the Azure Portal's monitoring insights I see
that every single operation contains failing transactions for the
GetPathStatus API. Unfortunately I don't see any additional details, but I
know the storage account is only used by Flink. Checkpointing isn't
failing, but I wonder if this could be an issue in the long term?

Regards,
Alexis.


Re: Continuous errors with Azure ABFSS

2023-10-06 Thread Alexis Sarda-Espinosa
Yes, that also works correctly, at least based on the Kafka source we use
(we'd get an alert if it suddenly started consuming from a very old offset).

Regards,
Alexis.

On Thu, 5 Oct 2023, 19:36 ramkrishna vasudevan, 
wrote:

> Sorry for the late reply. Just in case you restart the job , is it able to
> safely use the checkpoint and get back to the checkpointed state?
>
> Regards
> Ram,
>
> On Thu, Sep 28, 2023 at 4:46 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Surendra,
>>
>> there are no exceptions in the logs, nor anything salient with
>> INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
>> set the config
>>
>> execution.checkpointing.tolerable-failed-checkpoints: 1
>>
>> Regards,
>> Alexis.
>>
>> Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
>> surendralilh...@gmail.com>:
>>
>>> Hi Alexis,
>>>
>>> Could you please check the TaskManager log for any exceptions?
>>>
>>> Thanks
>>> Surendra
>>>
>>>
>>> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> We are using ABFSS for RocksDB's backend as well as the storage dir
>>>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>>>> that every single operation contains failing transactions for the
>>>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>>>> know the storage account is only used by Flink. Checkpointing isn't
>>>> failing, but I wonder if this could be an issue in the long term?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Question about serialization of java.util classes

2023-08-15 Thread Alexis Sarda-Espinosa
Check this answer: https://stackoverflow.com/a/64721838/5793905

You could then use, for example, something like: new
SetTypeInfo(Types.STRING) instead of Types.LIST(Types.STRING)

Am Di., 15. Aug. 2023 um 10:40 Uhr schrieb :

> Hello Alexis,
>
> Thank you for sharing the helper classes this but unfortunately I have no
> idea how to use these classes or how they might be able to help me. This is
> all very new to me and I honestly can't wrap my head around Flink's type
> information system.
>
> Best regards,
> Saleh.
>
> On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
> Hello,
>
> AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
> Flink can do about that. Here's an example of helper classes I've been
> using to support set serde in Flink POJOs, but note that it's hardcoded for
> LinkedHashSet, so you would have to create different implementations if you
> need to differentiate sorted sets:
>
> https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398
>
> Regards,
> Alexis.
>
>
> Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :
>
>> Hi,
>>
>> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
>> ```
>> package com.example;
>> import java.util.ArrayList;
>> import java.util.HashSet;
>> import java.util.TreeSet;
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> public class App {
>> public static class Pojo {
>> public ArrayList list;
>> public HashSet set;
>> public TreeSet treeset;
>> public Pojo() {
>> this.list = new ArrayList<>();
>> this.set = new HashSet<>();
>> this.treeset = new TreeSet<>();
>> }
>> }
>> public static void main(String[] args) throws Exception {
>> var env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().disableGenericTypes();
>> env.fromElements(new Pojo()).print();
>> env.execute("Pipeline");
>> }
>> }
>> ```
>>
>> The result of running:
>> ```
>> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - class java.util.ArrayList does not contain a setter for field
>> size
>> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Class class java.util.ArrayList cannot be used as a POJO
>> type because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types
>> & Serialization" for details of the effect on performance and schema
>> evolution.
>> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#list will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.HashSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#set will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.TreeSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#sset will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by
>> org.apache.flink.api.java.ClosureCleaner
>> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)

Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-22 Thread Alexis Sarda-Espinosa
Hi David,

I don't find it troublesome per se, I was rather trying to understand what 
should be expected (and documented) for my application. Before I restarted the 
job and changed some configurations, it ran for around 10 days and ended up 
with a state size of about 1.8GB, so I'm still not sure what is the upper bound 
in my scenario, or if that amount of "uncompacted garbage" is normal or not 
(for our throughput). This is important for us because we need to know how to 
size (disk space) the infrastructure, although it is also having a big impact 
on timings because each checkpoint ends up requiring 30+ seconds to complete, 
and they will eventually time out for sure.

I understand RocksDB has different sophisticated mechanisms, so I certainly 
don't expect one magic button that does exactly what I want, but ideally there 
would be a way to tune configuration in a way that a rough upper bound estimate 
of disk space can be deduced. Having some expired state for a while is 
expected, what I find odd is that it grows so fast, the size of the state 
quickly outpaces the size of processed events, even though the state only 
persists a subset of information (some integer ids, string ids, longs for 
epochs).

At this point I think I can conclude that the "live" state from my operators is 
not growing indefinitely (based on what I see with the state processor API), so 
is there a way to get a better estimate of disk utilization other than letting 
the job run and wait? I've been reading through RocksDB documentation as well, 
but that might not be enough because I don't know how Flink handles its own 
framework state internally.

Regards,
Alexis.


From: David Anderson 
Sent: Friday, April 22, 2022 9:57 AM
To: Alexis Sarda-Espinosa 
Cc: ro...@apache.org ; user@flink.apache.org 

Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Alexis,

Compaction isn't an all-at-once procedure. RocksDB is organized as a series of 
levels, each 10x larger than the one below. There are a few different 
compaction algorithms available, and they are tunable, but what's typically 
happening during compaction is that one SST file at level n is being merged 
into the relevant SST files at level n+1. During this compaction procedure, 
obsolete and deleted entries are cleaned up. And several such compactions can 
be occurring concurrently. (Not to mention that each TM has its own independent 
RocksDB instance.)

It's not unusual for jobs with a small amount of state to end up with 
checkpoints of a few hundred MBs in size, where a lot of that is uncompacted 
garbage. If you find this troublesome, you could configure RocksDB to compact 
more frequently.

David

On Thu, Apr 21, 2022 at 12:49 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I enabled some of the RocksDB metrics and I noticed some additional things. 
After changing the configuration YAML, I restarted the cluster with a 
savepoint, and I can see that it only used 5.6MB on disk. Consequently, after 
the job switched to running state, the new checkpoints were also a few MB in 
size. After running for 1 day, checkpoint size is now around 100MB. From the 
metrics I can see with the Prometheus reporter:

- All entries for num-live-versions show 1
- All entries for compaction-pending show 0
- Most entries for estimate-num-keys are in the range of 0 to 100, although I 
see a few with 151 coming from 
flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys

Is compaction expected after only 100MB? I imagine not, but if the savepoint 
shows that the effective amount of data is so low, size growth still seems far 
too large. In fact, if I only look at the UI, Bytes Received for the relevant 
SubTasks is about 14MB, yet the latest checkpoint already shows a Data Size of 
75MB for said SubTasks.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan mailto:ro...@apache.org>>
Sent: Mittwoch, 20. April 2022 10:37
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

State Processor API works on a higher level and is not aware of any RocksDB 
specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point 
> anyway. However, going back to the original question, is there no way to gain 
> more insight into this with the state processor API? You've mentioned 
> potential issues (too many states, missing compaction) but, with my 
> admittedly limited understanding of the way RocksDB is used in Flink, 

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-21 Thread Alexis Sarda-Espinosa
Hello,

I enabled some of the RocksDB metrics and I noticed some additional things. 
After changing the configuration YAML, I restarted the cluster with a 
savepoint, and I can see that it only used 5.6MB on disk. Consequently, after 
the job switched to running state, the new checkpoints were also a few MB in 
size. After running for 1 day, checkpoint size is now around 100MB. From the 
metrics I can see with the Prometheus reporter:

- All entries for num-live-versions show 1
- All entries for compaction-pending show 0
- Most entries for estimate-num-keys are in the range of 0 to 100, although I 
see a few with 151 coming from 
flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys

Is compaction expected after only 100MB? I imagine not, but if the savepoint 
shows that the effective amount of data is so low, size growth still seems far 
too large. In fact, if I only look at the UI, Bytes Received for the relevant 
SubTasks is about 14MB, yet the latest checkpoint already shows a Data Size of 
75MB for said SubTasks.

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Mittwoch, 20. April 2022 10:37
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

State Processor API works on a higher level and is not aware of any RocksDB 
specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

 wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point 
> anyway. However, going back to the original question, is there no way to gain 
> more insight into this with the state processor API? You've mentioned 
> potential issues (too many states, missing compaction) but, with my 
> admittedly limited understanding of the way RocksDB is used in Flink, I would 
> have thought that such things would be visible when using the state 
> processor. Is there no way for me to "parse" those MANIFEST files with some 
> of Flink's classes and get some more hints?
>
> Regards,
> Alexis.
>
> 
> From: Roman Khachatryan 
> Sent: Tuesday, April 19, 2022 5:51 PM
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org 
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> > I assume that when you say "new states", that is related to new descriptors 
> > with different names? Because, in the case of windowing for example, each 
> > window "instance" has its own scoped (non-global and keyed) state, but 
> > that's not regarded as a separate column family, is it?
> Yes, that's what I meant, and that's regarded as the same column family.
>
> Another possible reason is that SST files aren't being compacted and 
> that increases the MANIFEST file size.
> I'd check the total number of loaded SST files and the creation date 
> of the oldest one.
>
> You can also see whether there are any compactions running via RocksDB 
> metrics [1] [2] (a reporter needs to be configured [3]).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-num-running-compactions
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-compaction-pending
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/m
> etric_reporters/#reporters
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa 
>  wrote:
> >
> > Hi Roman,
> >
> > I assume that when you say "new states", that is related to new descriptors 
> > with different names? Because, in the case of windowing for example, each 
> > window "instance" has its own scoped (non-global and keyed) state, but 
> > that's not regarded as a separate column family, is it?
> >
> > For the 3 descriptors I mentioned before, they are only instantiated once 
> > and used like this:
> >
> > - Window list state: each call to process() executes 
> > context.windowState().getListState(...).get()
> > - Global map state: each call to process() executes 
> > context.globalState().getMapState(...)
> > - Global list state: within open(), runtimeContext.getListState(...) is 
> > executed once and used throughout the life of the operator.
> >
> > According to [1], the two ways of using global state should be equivalent.
> >
> > I will say that some of the operators instantiate the state descriptor in 
> > their constructors, i.e. before they are serialized to the TM, but the 
> > descriptors are Serializabl

RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Alexis Sarda-Espinosa
Hi Roman,

I assume that when you say "new states", that is related to new descriptors 
with different names? Because, in the case of windowing for example, each 
window "instance" has its own scoped (non-global and keyed) state, but that's 
not regarded as a separate column family, is it?

For the 3 descriptors I mentioned before, they are only instantiated once and 
used like this:

- Window list state: each call to process() executes 
context.windowState().getListState(...).get()
- Global map state: each call to process() executes 
context.globalState().getMapState(...)
- Global list state: within open(), runtimeContext.getListState(...) is 
executed once and used throughout the life of the operator.

According to [1], the two ways of using global state should be equivalent.

I will say that some of the operators instantiate the state descriptor in their 
constructors, i.e. before they are serialized to the TM, but the descriptors 
are Serializable, so I imagine that's not relevant.

[1] https://stackoverflow.com/a/50510054/5793905

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Dienstag, 19. April 2022 11:48
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

Thanks a lot for the information,

MANIFEST files list RocksDB column families (among other info); ever growing 
size of these files might indicate that some new states are constantly being 
created.
Could you please confirm that the number of state names is constant?

> Could you confirm if Flink's own operators could be creating state in 
> RocksDB? I assume the window operators save some information in the state as 
> well.
That's correct, window operators maintain a list of elements per window and a 
set of timers (timestamps). These states' names should be fixed (something like 
"window-contents" and "window-timers").

> is that related to managed state used by my functions? Or does that indicate 
> size growth is elsewhere?
The same mechanism is used for both Flink internal state and operator state, so 
it's hard to say without at least knowing the state names.


Regards,
Roman


On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
>
> /shared folder contains keyed state that is shared among different 
> checkpoints [1]. Most of state should be shared in your case since 
> you're using keyed state and incremental checkpoints.
>
> When a checkpoint is loaded, the state that it shares with older 
> checkpoints is loaded as well. I suggested to load different 
> checkpoints (i.e. chk-* folders) and compare the numbers of objects in 
> their states. To prevent the job from discarding the state, it can 
> either be stopped for some time and then restarted from the latest 
> checkpoint; or the number of retained checkpoints can be increased 
> [2]. Copying isn't necessary.
>
> Besides that, you can also check state sizes of operator in Flink Web 
> UI (but not the sizes of individual states). If the operators are 
> chained then their combined state size will be shown. To prevent this, 
> you can disable chaining [3] (although this will have performance 
> impact).
>
> Individual checkpoint folders should be eventually removed (when the 
> checkpoint is subsumed). However, this is not guaranteed: if there is 
> any problem during deletion, it will be logged, but the job will not 
> fail.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> eckpoints/#directory-structure
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-checkpoints-num-retained
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre
> am/operators/overview/#disable-chaining
>
> Regards,
> Roman
>
> On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
>  wrote:
> >
> > Hi Roman,
> >
> > Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> > You suggest comparing counts of objects in different checkpoints, I assume 
> > you mean copying my "checkpoints" folder at different times and comparing, 
> > not comparing different "chk-*" folders in the same snapshot, right?
> >
> > I haven't executed the processor program with a newer checkpoint, but I did 
> > look at the folder in the running system, and I noticed that most of the 
> > chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> > corresponding to newer checkpoints. I would think this makes sense since 
> > the configuration specifies that only 1 completed checkpoint should be 
> > retained, but then why are the older chk-* folders still there? I did 
> > trigger a manual r

Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Alexis Sarda-Espinosa
I can look into RocksDB metrics, I need to configure Prometheus at some point 
anyway. However, going back to the original question, is there no way to gain 
more insight into this with the state processor API? You've mentioned potential 
issues (too many states, missing compaction) but, with my admittedly limited 
understanding of the way RocksDB is used in Flink, I would have thought that 
such things would be visible when using the state processor. Is there no way 
for me to "parse" those MANIFEST files with some of Flink's classes and get 
some more hints?

Regards,
Alexis.


From: Roman Khachatryan 
Sent: Tuesday, April 19, 2022 5:51 PM
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
Yes, that's what I meant, and that's regarded as the same column family.

Another possible reason is that SST files aren't being compacted and
that increases the MANIFEST file size.
I'd check the total number of loaded SST files and the creation date
of the oldest one.

You can also see whether there are any compactions running via RocksDB
metrics [1] [2] (a reporter needs to be configured [3]).

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters

Regards,
Roman

On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
 wrote:
>
> Hi Roman,
>
> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
>
> For the 3 descriptors I mentioned before, they are only instantiated once and 
> used like this:
>
> - Window list state: each call to process() executes 
> context.windowState().getListState(...).get()
> - Global map state: each call to process() executes 
> context.globalState().getMapState(...)
> - Global list state: within open(), runtimeContext.getListState(...) is 
> executed once and used throughout the life of the operator.
>
> According to [1], the two ways of using global state should be equivalent.
>
> I will say that some of the operators instantiate the state descriptor in 
> their constructors, i.e. before they are serialized to the TM, but the 
> descriptors are Serializable, so I imagine that's not relevant.
>
> [1] https://stackoverflow.com/a/50510054/5793905
>
> Regards,
> Alexis.
>
> -Original Message-
> From: Roman Khachatryan 
> Sent: Dienstag, 19. April 2022 11:48
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state 
> processor API
>
> Hi Alexis,
>
> Thanks a lot for the information,
>
> MANIFEST files list RocksDB column families (among other info); ever growing 
> size of these files might indicate that some new states are constantly being 
> created.
> Could you please confirm that the number of state names is constant?
>
> > Could you confirm if Flink's own operators could be creating state in 
> > RocksDB? I assume the window operators save some information in the state 
> > as well.
> That's correct, window operators maintain a list of elements per window and a 
> set of timers (timestamps). These states' names should be fixed (something 
> like "window-contents" and "window-timers").
>
> > is that related to managed state used by my functions? Or does that 
> > indicate size growth is elsewhere?
> The same mechanism is used for both Flink internal state and operator state, 
> so it's hard to say without at least knowing the state names.
>
>
> Regards,
> Roman
>
>
> On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
> >
> > /shared folder contains keyed state that is shared among different
> > checkpoints [1]. Most of state should be shared in your case since
> > you're using keyed state and incremental checkpoints.
> >
> > When a checkpoint is loaded, the state that it shares with older
> > checkpoints is loaded as well. I suggest

RE: RocksDB state not cleaned up

2022-04-08 Thread Alexis Sarda-Espinosa
May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And 
if yes, if it helped with this case.

Regards,
Alexis.

From: tao xiao 
Sent: Samstag, 18. September 2021 05:01
To: David Morávek 
Cc: Yun Tang ; user 
Subject: Re: RocksDB state not cleaned up

Thanks for the feedback! However TTL already proves that the state cannot be 
cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang do you have any suggestions to tune 
RocksDB to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek 
mailto:d...@apache.org>> wrote:
Cleaning up with timers should solve this. Both approaches have some advantages 
and disadvantages though.

Timers:
- No "side effects".
- Can be set in event time. Deletes are regular tombstones that will get 
compacted later on.

TTL:
- Performance. This costs literally nothing compared to an extra state for 
timer + writing a tombstone marker.
- Has "side-effects", because it works in processing time. This is just 
something to keep in mind eg. when bootstraping the state from historical data. 
(large event time / processing time skew)

With 1.14 release, we've bumped the RocksDB version so it may be possible to 
use a "periodic compaction" [1], but nobody has tried that so far. In the 
meantime I think there is non real workaround because we don't expose a way to 
trigger manual compaction.

I'm off to vacation until 27th and I won't be responsive during that time. I'd 
like to pull Yun into the conversation as he's super familiar with the RocksDB 
state backend.

[1] 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction

Best,
D.

On Fri, Sep 17, 2021 at 5:17 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that 
compaction doesn't clean up the high level sst files fast enough.  Do you think 
manual clean up by registering a timer is the way to go or any RocksDB 
parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi David,

If I read Stephan's comment correctly TTL doesn't work well for cases where we 
have too many levels, like fast growing state,  as compaction doesn't clean up 
high level SST files in time, Is this correct? If yes should we register a 
timer with TTL time and manual clean up the state (state.clear() ) when the 
timer fires?

I will turn on RocksDB logging as well as compaction logging [1] to verify this

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction


On Tue, Sep 14, 2021 at 5:38 PM David Morávek 
mailto:d...@apache.org>> wrote:
Hi Tao,

my intuition is that the compaction of SST files is not triggering. By default, 
it's only triggered by the size ratios of different levels [1] and the TTL 
mechanism has no effect on it.

Some reasoning from Stephan:

It's very likely to have large files in higher levels that haven't been 
compacted in a long time and thus just stay around.

This might be especially possible if you insert a lot in the beginning (build 
up many levels) and then have a moderate rate of modifications, so the changes 
and expiration keep happening purely in the merges / compactions of the first 
levels. Then the later levels may stay unchanged for quite some time.

You should be able to see compaction details by setting RocksDB logging to INFO 
[2]. Can you please check these and validate whether this really is the case?

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
[2] 
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

Best,
D.

On Mon, Sep 13, 2021 at 3:18 PM tao xiao 
mailto:xiaotao...@gmail.com>> wrote:
Hi team

We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL 
update type is OnCreateAndWrite. We set the value state when the value state 
doesn't exist and we never update it again after the state is not empty. The 
key of the value state is timestamp. My understanding of such TTL settings is 
that the size of all SST files remains flat (let's disregard the impact space 
amplification brings) after 1 day as the daily data volume is more or less the 
same. However the RocksDB native metrics show that the SST files continue to 
grow since I started the job. I check the SST files in local storage and I can 
see SST files with age 1 months ago (when I started the job). What is the 
possible reason for the SST files not cleaned up?.

The Flink version is 1.12.1
State backend is RocksDB with incremental checkpoint
All default configuration for RocksDB
Per job mode in Yarn and checkpoint to S3


Here is the code to set value state

public void open(Configuration parameters) {
StateTtlConfig ttlConfigClick = StateTtlConfig
.newBuilder(Time.days(1))
   

RocksDB's state size discrepancy with what's seen with state processor API

2022-04-08 Thread Alexis Sarda-Espinosa
Hello,

I have a streaming job running on Flink 1.14.4 that uses managed state with 
RocksDB with incremental checkpoints as backend. I've been monitoring a dev 
environment that has been running for the last week and I noticed that state 
size and end-to-end duration have been increasing steadily. Currently, duration 
is 11 seconds and size is 917MB (as shown in the UI). The tasks with the 
largest state (614MB) come from keyed sliding windows. Some attributes of this 
job's setup:


  *   Windows are 11 minutes in size.
  *   Slide time is 1 minute.
  *   Throughput is approximately 20 events per minute.

I have 3 operators with these states:


  1.  Window state with ListState and no TTL.
  2.  Global window state with MapState> and a TTL of 1 hour 
(with cleanupInRocksdbCompactFilter(1000L)).
  3.  Global window state with ListState where the Pojo has an int and a 
long, a TTL of 1 hour, and configured with cleanupInRocksdbCompactFilter(1000L) 
as well.

Both operators with global window state have logic to manually remove old state 
in addition to configured TTL. The other operator does override and call 
clear().

I have now analyzed the checkpoint folder with the state processor API, and 
I'll note here that I see 50 folders named chk-*** even though I don't set 
state.checkpoints.num-retained and the default should be 1. I loaded the data 
from the folder with the highest chk number and I see that my operators have 
these amounts respectively:


  1.  10 entries
  2.  80 entries
  3.  200 entries

I got those numbers with something like this:

savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(...)
.collect()
.parallelStream()
.reduce(0, Integer::sum);

Where my WindowReaderFunction classes just count the number of entries in each 
call to readWindow.

Those amounts cannot possibly account for 614MB, so what am I missing?

Regards,
Alexis.



RE: Interval join operator is not forwarding watermarks correctly

2022-03-17 Thread Alexis Sarda-Espinosa
Hi Dawid,

Thanks for the update, I also managed to work around it by adding another 
watermark assignment operator between the join and the window. I’ll have to see 
if it’s possible to assign watermarks at the source, but even if it is, I worry 
that the different partitions created by all my keyBy() steps would make it 
difficult for me to figure out which parts of my pipeline should be idle.

Regards,
Alexis.


Re: Continuous errors with Azure ABFSS

2023-11-10 Thread Alexis Sarda-Espinosa
After enabling some more logging for the storage account, I figured out the
errors correspond to 404 PathNotFound responses. My guess is the file
system checks the status of a path to see if it exists or not before
trying to write to it, in this case for _metadata files from each new
checkpoint. Seems like normal operations, so it's just unfortunate the
Azure API exposes that as continuous ClientOtherError metrics.

Regards,
Alexis.

Am Fr., 6. Okt. 2023 um 08:10 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Yes, that also works correctly, at least based on the Kafka source we use
> (we'd get an alert if it suddenly started consuming from a very old offset).
>
> Regards,
> Alexis.
>
> On Thu, 5 Oct 2023, 19:36 ramkrishna vasudevan, 
> wrote:
>
>> Sorry for the late reply. Just in case you restart the job , is it able
>> to safely use the checkpoint and get back to the checkpointed state?
>>
>> Regards
>> Ram,
>>
>> On Thu, Sep 28, 2023 at 4:46 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hi Surendra,
>>>
>>> there are no exceptions in the logs, nor anything salient with
>>> INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
>>> set the config
>>>
>>> execution.checkpointing.tolerable-failed-checkpoints: 1
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
>>> surendralilh...@gmail.com>:
>>>
>>>> Hi Alexis,
>>>>
>>>> Could you please check the TaskManager log for any exceptions?
>>>>
>>>> Thanks
>>>> Surendra
>>>>
>>>>
>>>> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We are using ABFSS for RocksDB's backend as well as the storage dir
>>>>> required for Kubernetes HA. In the Azure Portal's monitoring insights I 
>>>>> see
>>>>> that every single operation contains failing transactions for the
>>>>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>>>>> know the storage account is only used by Flink. Checkpointing isn't
>>>>> failing, but I wonder if this could be an issue in the long term?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>


RE: Schema Evolution of POJOs fails on Field Removal

2022-05-18 Thread Alexis Sarda-Espinosa
Hi David,

Please refer to https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Alexis.

-Original Message-
From: David Jost  
Sent: Mittwoch, 18. Mai 2022 15:07
To: user@flink.apache.org
Subject: Schema Evolution of POJOs fails on Field Removal

Hi,

we currently have an issue, where our job fails to restart from a savepoint, 
after we removed a field from a serialised (POJO) class. According to [0], this 
kind of evolution is supported, but it sadly only works when adding, but not 
removing fields.

I was hoping, someone here might be able to help or have a pointer on where to 
continue the search for a solution.

Thank you in advance.

Best
  David


Re: Making Kafka source respect offset changed externally

2022-07-20 Thread Alexis Sarda-Espinosa
Hello again,

I just performed a test
using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I
did a few tests in the following order, and I noticed a few weird things.
Note that our job uses Processing Time windows, so watermarks are
irrelevant.

1. After the job had been running for a while, we manually moved the
consumer group's offset to 12 hours in the past [1] (without restarting the
job).
  - After this, the consumer simply stopped reading messages - the consumer
lag in Kafka stayed at around 150k (no new data arrived)

2. We restarted the job with a checkpoint.
  - The consumer lag in Kafka dropped down to 0, but no data was
emitted from the windows.

3. We stopped the job, moved the offset again, and restarted Without any
checkpoint/savepoint.
  - This time the consumer correctly processed the backlog and emitted
events from the windows.

This was done with Flink 1.15.0.

Is this expected? In other words, if there's a mismatch between Flink's
state's offset and Kafka's offset, will the job be unable to run?



[1] The command to move the offset was:

kafka-consumer-groups.sh \
  --bootstrap-server ... \
  --topic our-topic \
  --group our-group \
  --command-config kafka-preprod.properties \
  --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
  --execute

Regards,
Alexis.

Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Yaroslav,
>
> The test I did was just using earliest, I'll test with committed offset
> again, thanks.
>
> Regards,
> Alexis.
>
> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, 
> wrote:
>
>> Hi Alexis,
>>
>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
>> consumer offsets? In this case, it should get the offsets from Kafka and
>> not the state.
>>
>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Regarding the new Kafka source (configure with a consumer group), I
>>> found out that if I manually change the group's offset with Kafka's admin
>>> API independently of Flink (while the job is running), the Flink source
>>> will ignore that and reset it to whatever it stored internally. Is there
>>> any way to prevent this?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Making Kafka source respect offset changed externally

2022-07-21 Thread Alexis Sarda-Espinosa
I would suggest updating the documentation to include that statement.

I imagine dynamic partition discovery has no effect on this?

Regards,
Alexis.

Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:

> Flink only reads the offsets from Kafka when the job is initially started
> from a clear slate.
> Once checkpoints are involved it only relies on offsets stored in the
> state.
>
> On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:
>
> Hello again,
>
> I just performed a test
> using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I
> did a few tests in the following order, and I noticed a few weird things.
> Note that our job uses Processing Time windows, so watermarks are
> irrelevant.
>
> 1. After the job had been running for a while, we manually moved the
> consumer group's offset to 12 hours in the past [1] (without restarting the
> job).
>   - After this, the consumer simply stopped reading messages - the
> consumer lag in Kafka stayed at around 150k (no new data arrived)
>
> 2. We restarted the job with a checkpoint.
>   - The consumer lag in Kafka dropped down to 0, but no data was
> emitted from the windows.
>
> 3. We stopped the job, moved the offset again, and restarted Without any
> checkpoint/savepoint.
>   - This time the consumer correctly processed the backlog and emitted
> events from the windows.
>
> This was done with Flink 1.15.0.
>
> Is this expected? In other words, if there's a mismatch between Flink's
> state's offset and Kafka's offset, will the job be unable to run?
>
>
>
> [1] The command to move the offset was:
>
> kafka-consumer-groups.sh \
>   --bootstrap-server ... \
>   --topic our-topic \
>   --group our-group \
>   --command-config kafka-preprod.properties \
>   --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
>   --execute
>
> Regards,
> Alexis.
>
> Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hi Yaroslav,
>>
>> The test I did was just using earliest, I'll test with committed offset
>> again, thanks.
>>
>> Regards,
>> Alexis.
>>
>> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, 
>> wrote:
>>
>>> Hi Alexis,
>>>
>>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
>>> consumer offsets? In this case, it should get the offsets from Kafka and
>>> not the state.
>>>
>>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Regarding the new Kafka source (configure with a consumer group), I
>>>> found out that if I manually change the group's offset with Kafka's admin
>>>> API independently of Flink (while the job is running), the Flink source
>>>> will ignore that and reset it to whatever it stored internally. Is there
>>>> any way to prevent this?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>


Making Kafka source respect offset changed externally

2022-07-14 Thread Alexis Sarda-Espinosa
Hello,

Regarding the new Kafka source (configure with a consumer group), I found
out that if I manually change the group's offset with Kafka's admin API
independently of Flink (while the job is running), the Flink source will
ignore that and reset it to whatever it stored internally. Is there any way
to prevent this?

Regards,
Alexis.


Did the semantics of Kafka's earliest offset change with the new source API?

2022-07-13 Thread Alexis Sarda-Espinosa
Hello,

I have a job running with Flink 1.15.0 that consumes from Kafka with the
new KafkaSource API, setting a group ID explicitly and specifying
OffsetsInitializer.earliest() as a starting offset. Today I restarted the
job ignoring both savepoint and checkpoint, and the consumer started
reading from the first available message in the broker (from 24 hours ago),
i.e. it completely ignored the offsets that were committed to Kafka. If I
use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
instead, the problem seems to go away.

With the previous FlinkKafkaConsumer, using earliest didn't cause any such
issues. Was this changed in the aforementioned way on purpose?

Regards,
Alexis.


Re: Making Kafka source respect offset changed externally

2022-07-14 Thread Alexis Sarda-Espinosa
Hi Yaroslav,

The test I did was just using earliest, I'll test with committed offset
again, thanks.

Regards,
Alexis.

On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,  wrote:

> Hi Alexis,
>
> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
> consumer offsets? In this case, it should get the offsets from Kafka and
> not the state.
>
> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Regarding the new Kafka source (configure with a consumer group), I found
>> out that if I manually change the group's offset with Kafka's admin API
>> independently of Flink (while the job is running), the Flink source will
>> ignore that and reset it to whatever it stored internally. Is there any way
>> to prevent this?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Did the semantics of Kafka's earliest offset change with the new source API?

2022-07-18 Thread Alexis Sarda-Espinosa
Hi David,

thanks for the info, indeed I had misunderstood our old configuration, we
didn't use earliest before, we just used the default.

Regards,
Alexis.

Am Fr., 15. Juli 2022 um 14:46 Uhr schrieb David Anderson <
dander...@apache.org>:

> What did change was the default starting position when not starting from a
> checkpoint. With FlinkKafkaConsumer, it starts from the committed offsets
> by default. With KafkaSource, it starts from the earliest offset.
>
> David
>
> On Fri, Jul 15, 2022 at 5:57 AM Chesnay Schepler 
> wrote:
>
>> I'm not sure about the previous behavior, but at the very least according
>> to the documentation the behavior is identical.
>>
>> 1.12:
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>
>> *setStartFromEarliest()** / **setStartFromLatest()**: Start from the
>> earliest / latest record. Under these modes, committed offsets in Kafka
>> will be ignored and not used as starting positions.*
>>
>> On 13/07/2022 18:53, Alexis Sarda-Espinosa wrote:
>>
>> Hello,
>>
>> I have a job running with Flink 1.15.0 that consumes from Kafka with the
>> new KafkaSource API, setting a group ID explicitly and specifying
>> OffsetsInitializer.earliest() as a starting offset. Today I restarted the
>> job ignoring both savepoint and checkpoint, and the consumer started
>> reading from the first available message in the broker (from 24 hours ago),
>> i.e. it completely ignored the offsets that were committed to Kafka. If I
>> use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
>> instead, the problem seems to go away.
>>
>> With the previous FlinkKafkaConsumer, using earliest didn't cause any
>> such issues. Was this changed in the aforementioned way on purpose?
>>
>> Regards,
>> Alexis.
>>
>>
>>


RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-05-02 Thread Alexis Sarda-Espinosa
Ok

Regards,
Alexis.

From: Peter Brucia 
Sent: Freitag, 22. April 2022 15:31
To: Alexis Sarda-Espinosa 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

No
Sent from my iPhone



Serialization in window contents and network buffers

2022-09-27 Thread Alexis Sarda-Espinosa
Hi everyone,

I know the low level details of this are likely internal, but at a high
level we can say that operators usually have some state associated with
them. Particularly for error handling and job restarts, I imagine windows
must persist state, and operators in general probably persist network
buffers for anything that hasn't been processed or emitted.

Does Flink's serialization stack with type information applies to these
cases?

I ask specifically due to a use case I'm evaluating. I would have an
interface as output of some operators, and all those outputs would go into
a windowing operator. However, each non-windowing operator would emit a
different concrete class that implements the interface.

If I add @TypeInfo annotations to the concrete implementers, would Flink
find them and use them even if the operators are defined in terms of the
interface?

Regards,
Alexis.


Broadcast state and job restarts

2022-10-27 Thread Alexis Sarda-Espinosa
Hello,

The documentation for broadcast state specifies that it is always kept in
memory. My assumptions based on this statement are:

1. If a job restarts in the same Flink cluster (i.e. using a restart
strategy), the tasks' attempt number increases and the broadcast state is
restored since it's not lost from memory.
2. If the whole Flink cluster is restarted with a savepoint, broadcast
state will not be restored and I need to write my application with this in
mind.

Are these correct?

Regards,
Alexis.


Window state size with global window and custom trigger

2022-10-07 Thread Alexis Sarda-Espinosa
Hello,

I found an SO thread that clarifies some details of window state size [1].
I would just like to confirm that this also applies when using a global
window with a custom trigger.

The reason I ask is that the TriggerResult API is meant to cover all
supported scenarios, so FIRE vs FIRE_AND_PURGE is relevant, for example,
for a ProcessWindowFunction that holds all input records until it fires.
However, I assume there would be no distinction if I use a
(Rich)AggregateFunction, regardless of window type (global vs timed), but
I'd like to be sure.

Regards,
Alexis.

[1]
https://stackoverflow.com/questions/55247668/flink-window-state-size-and-state-management


Re: Window state size with global window and custom trigger

2022-10-10 Thread Alexis Sarda-Espinosa
Thanks for the confirmation :)

Regards,
Alexis.

On Sun, 9 Oct 2022, 10:37 Hangxiang Yu,  wrote:

> Hi, Alexis.
> I think you are right. It also applies for a global window with a custom
> trigger.
> If you apply a ReduceFunction or AggregateFunction, the window state size
> usually is smaller than applying ProcessWindowFunction due to the
> aggregated value. It also works for global windows.
> Of course, the state size of a global window also depends on how you
> implement your trigger.
> BTW, we often use TTL to reduce the state size of the global window.
> Hope these can help you.
>
>
> On Sat, Oct 8, 2022 at 4:49 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I found an SO thread that clarifies some details of window state size
>> [1]. I would just like to confirm that this also applies when using a
>> global window with a custom trigger.
>>
>> The reason I ask is that the TriggerResult API is meant to cover all
>> supported scenarios, so FIRE vs FIRE_AND_PURGE is relevant, for example,
>> for a ProcessWindowFunction that holds all input records until it fires.
>> However, I assume there would be no distinction if I use a
>> (Rich)AggregateFunction, regardless of window type (global vs timed), but
>> I'd like to be sure.
>>
>> Regards,
>> Alexis.
>>
>> [1]
>> https://stackoverflow.com/questions/55247668/flink-window-state-size-and-state-management
>>
>>
>
> --
> Best,
> Hangxiang.
>


Partial broadcast/keyed connected streams

2022-10-11 Thread Alexis Sarda-Espinosa
Hi everyone,

I am currently thinking about a use case for a streaming job and, while I'm
fairly certain it cannot be done with the APIs that Flink currently
provides, I figured I'd put it out there in case other users think
something like this would be useful to a wider audience.

The current broadcasting mechanisms offered by Flink mention use cases
where "control events" are needed. In my case I would also have control
events, and I would need to broadcast them to *all parallel instances* of
any downstream operators that consume the events. However, some of those
operators have to be keyed because they are stateful. From the API's point
of view, I'd imagine something like

controlStream.connect(mainStream).broadcastFirstKeySecondBy(keySelector).process(PartiallyKeyedCoProcessFunction)

The function would also have something like processElement1 and
processElement2, but one of those methods wouldn't have access to
partitioned state (or could it have access to state for all key groups
handled by that instance?).

Since I'm not familiar with all of Flink's internals, I don't know if this
would be even remotely feasible, but I'd like to know if others have
opinions on this.

Regards,
Alexis.


Re: Partial broadcast/keyed connected streams

2022-10-11 Thread Alexis Sarda-Espinosa
Oh wow, I had read that documentation so many times and I was sure that API
also expected the broadcasted side to have a key like the other side, but
that's not the case, that is already what I was thinking of. Thanks.

Regards,
Alexis.

On Wed, 12 Oct 2022, 03:42 仙路尽头谁为峰,  wrote:

> Hi Alexis:
>
>The broadcast state pattern should be done by calling connect() on
> the non-broadcasted stream, with the *broadcaststream* as an argument.
>
>And if the main stream is keyedStream, then the processElement
> function will have access to any keyed state as normal keyedstream.
>
>
>
> Best Regards!
>
> 从 Windows 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
>
>
> *发件人: *Alexis Sarda-Espinosa 
> *发送时间: *2022年10月12日 4:11
> *收件人: *user 
> *主题: *Partial broadcast/keyed connected streams
>
>
>
> Hi everyone,
>
>
>
> I am currently thinking about a use case for a streaming job and, while
> I'm fairly certain it cannot be done with the APIs that Flink currently
> provides, I figured I'd put it out there in case other users think
> something like this would be useful to a wider audience.
>
>
>
> The current broadcasting mechanisms offered by Flink mention use cases
> where "control events" are needed. In my case I would also have control
> events, and I would need to broadcast them to *all parallel instances* of
> any downstream operators that consume the events. However, some of those
> operators have to be keyed because they are stateful. From the API's point
> of view, I'd imagine something like
>
>
>
>
> controlStream.connect(mainStream).broadcastFirstKeySecondBy(keySelector).process(PartiallyKeyedCoProcessFunction)
>
>
>
> The function would also have something like processElement1 and
> processElement2, but one of those methods wouldn't have access to
> partitioned state (or could it have access to state for all key groups
> handled by that instance?).
>
>
>
> Since I'm not familiar with all of Flink's internals, I don't know if this
> would be even remotely feasible, but I'd like to know if others have
> opinions on this.
>
>
>
> Regards,
>
> Alexis.
>
>
>
>
>


Broadcast state restoration for BroadcastProcessFunction

2022-10-14 Thread Alexis Sarda-Espinosa
Hello,

I wrote a test for a broadcast function to check how it handles broadcast
state during retries [1] (the gist only shows a subset of the test in
Kotlin, but it's hopefully understandable). The test will not pass unless
my function also implements CheckpointedFunction, although those
interface's methods' implementations can be empty - the state is empty in
this case, even though its descriptor is registered with the harness.

Is this requirement specific to the test harness API?
Otherwise BaseBroadcastProcessFunction should implement
CheckpointedFunction, maybe with empty default methods, no?

[1] https://gist.github.com/asardaes/b804b7ed04ace176881189c3d1cf842a

Regards,
Alexis.


Re: Cleanup for high-availability.storageDir

2022-12-06 Thread Alexis Sarda-Espinosa
One concrete question, under the HA folder I also see these sample entries:

- job_name/blob/job_uuid/blob_...
- job_name/submittedJobGraphX
- job_name/submittedJobGraphY

Is it safe to clean these up when the job is in a healthy state?

Regards,
Alexis.

Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Gyula,
>
> that certainly helps, but to set up automatic cleanup (in my case, of
> azure blob storage), the ideal option would be to be able to set a simple
> policy that deletes blobs that haven't been updated in some time, but that
> would assume that anything that's actually relevant for the latest state is
> "touched" by the JM on every checkpoint, and since I also see blobs
> referencing "submitted job graphs", I imagine that might not be a safe
> assumption.
>
> I understand the life cycle of those blobs isn't directly managed by the
> operator, but in that regard it could make things more cumbersome.
>
> Ideally, Flink itself would guarantee this sort of allowable TTL for HA
> files, but I'm sure that's not trivial.
>
> Regards,
> Alexis.
>
> On Mon, 5 Dec 2022, 19:19 Gyula Fóra,  wrote:
>
>> Hi!
>>
>> There are some files that are not cleaned up over time in the HA dir that
>> need to be cleaned up by the user:
>>
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>>
>>
>> Hope this helps
>> Gyula
>>
>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I see the number of entries in the directory configured for HA increases
>>> over time, particularly in the context of job upgrades in a Kubernetes
>>> environment managed by the operator. Would it be safe to assume that any
>>> files that haven't been updated in a while can be deleted? Assuming the
>>> checkpointing interval is much smaller than the period used to determine if
>>> files are too old.
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Cleanup for high-availability.storageDir

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Matthias,

I think you didn't include the mailing list in your response.

According to my experiments, using last-state means the operator simply
deletes the Flink pods, and I believe that doesn't count as Cancelled, so
the artifacts for blobs and submitted job graphs are not cleaned up. I
imagine the same logic Gyula mentioned before applies, namely keep the
latest one and clean the older ones.

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> I see, I confused the Flink-internal recovery with what the Flink
> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do
> an upgrade of your job, the operator will cancel the Flink job (I'm
> assuming now that you use Flink's Application mode rather than Session
> mode). The operator cancelled your job and shuts down the cluster.
> Checkpoints are retained and, therefore, can be used as the so-called "last
> state" when redeploying your job using the same Job ID. In that case, the
> corresponding jobGraph and other BLOBs should be cleaned up by Flink
> itself. The checkpoint files are retained, i.e. survive the Flink cluster
> shutdown.
>
> When redeploying the Flink cluster with the (updated) job, a new JobGraph
> file is created by Flink internally. BLOBs are recreated as well. New
> checkpoints are going to be created and old ones (that are not needed
> anymore) are cleaned up.
>
> Just to recap what I said before (to make it more explicit to
> differentiate what the k8s operator does and what Flink does internally):
> Removing the artifacts you were talking about in your previous post would
> harm Flink's internal recovery mechanism. That's probably not what you want.
>
> @Gyula: Please correct me if I misunderstood something here.
>
> I hope that helped.
> Matthias
>
> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> I see, thanks for the details.
>>
>> I do mean replacing the job without stopping it terminally. Specifically,
>> I mean updating the container image with one that contains an updated job
>> jar. Naturally, the new version must not break state compatibility, but as
>> long as that is fulfilled, the job should be able to use the last
>> checkpoint as starting point. It's my understanding that this is how the
>> Kubernetes operator's "last-state" upgrade mode works [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
>> matthias.p...@aiven.io>:
>>
>>> > - job_name/submittedJobGraphX
>>> submittedJobGraph* is the persisted JobGraph that would be picked up in
>>> case of a failover. Deleting this file would result in Flink's failure
>>> recovery not working properly anymore if the job is still executed but
>>> needs to be restarted because the actual job definition is gone.
>>>
>>> > completedCheckpointXYZ
>>> This is the persisted CompletedCheckpoint with a reference to the actual
>>> Checkpoint directory. Deleting this file would be problematic if the state
>>> recovery relies in some way on this specific checkpoint. The HA data relies
>>> on this file to be present. Failover only fails if there's no newer
>>> checkpoint or the HA data still refers to this checkpoint in some way.
>>>
>>> > - job_name/blob/job_uuid/blob_...
>>> Artifacts of the BlobServer containing runtime artifacts of the jobs
>>> (e.g. logs, libraries, ...)
>>>
>>> In general, you don't want to clean HA artifacts if the job hasn't
>>> reached a terminal state, yet, as it harms Flink's ability to recover the
>>> job. Additionally, these files are connected with the HA backend, i.e. the
>>> file path is stored in the HA backend. Removing the artifacts will likely
>>> result in metadata becoming invalid.
>>>
>>> What do you mean with "testing updates *without* savepoints"? Are you
>>> referring to replacing the job's business logic without stopping the job?
>>>
>>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Then the explanation is likely that the job has not reached a terminal
>>>> state. I was testing updates *without* savepoints (but with HA), so I guess
>>>> that never triggers automatic cleanup.
&

Re: Deterministic co-results

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Salva,

Just to give you further thoughts from another user, I think the "temporal
join" semantics are very critical in this use case, and what you implement
for that may not easily generalize to other cases. Because of that, I'm not
sure if you can really define best practices that apply in general.
Additionally, you also have to take idleness into account, given that using
event-time could leave you in a "frozen" state if you're not receiving
events continuously.

I also doubt you can accurately estimate out-of-orderness in this scenario
due to the particularities of Flink's network stack [1]. Even if you only
have 2 sources and immediately connect them together, parallelism and the
resulting shuffles can change from one execution to the other even if you
don't change the logic at all, because scheduling is also non-deterministic
and the "distribution" of events across different parallel instances of
your sources could vary a lot as well.

I think that others will tell you that you indeed need to find a way to
buffer events for a while, I got the same advice in the past. Focusing very
specifically on what you described (streams for data & control events +
event-time + temporal join), but maybe also assuming you can manage
watermarks in a way that handles idleness without simply freezing the
stream, I once tried a custom operator (not function) to force the
watermarks of 1 stream to wait for the other one -
see PrioritizedWatermarkStreamIntervalJoinOperator in [2]. This still uses
the idea of buffering, it just moves that responsibility to the operator
that's already handling "window data" for the join. Also, it extends an
internal class, so it's very much unofficial, and it's probably too
specific to my use case, but maybe it gives you other ideas to consider.

And one last detail to further exemplify complexity here: when I was
testing my custom operator with event-time simulations in my IDE, I
initially didn't think about the fact that a watermark with Long.MAX_VALUE
is sent at the end of the simulation, which was another source of
non-determinism because sometimes the control stream was processed entirely
(including the max watermark) before a single event from the data stream
went through, which meant that all events from the data stream were
considered late arrivals and silently dropped.

[1] https://flink.apache.org/2019/06/05/flink-network-stack.html
[2] https://lists.apache.org/thread/n79tsqd5y474n5th1wdhkrh7bvlts8bs

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 15:20 Uhr schrieb Salva Alcántara <
salcantara...@gmail.com>:

> Just for adding some extra references:
>
> [5]
> https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
> [6]
> https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
> [7]
> https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260
>
> Salva
>
> On 2022/12/07 18:52:42 Salva Alcántara wrote:
> > It's well-known that Flink does not provide any guarantees on the order
> in
> > which a CoProcessFunction (or one of its multiple variations) processes
> its
> > two inputs [1]. I wonder then what is the current best
> practice/recommended
> > approach for cases where one needs deterministic results in presence of:
> >
> > 1. A control stream
> > 2. An event/data stream
> >
> > Let's consider event-time semantics; both streams have timestamps, and we
> > want to implement "temporal join" semantics where the input events are
> > controlled based on the latest control signals received before them,
> i.e.,
> > the ones "active" when the events happened. For simplicity, let's assume
> > that all events are received in order, so that the only source of
> > non-determinism is the one introduced by the CoProcessFunction itself.
> >
> > I'm considering the following options:
> >
> > 1. Buffer events inside the CoProcessFunction for some time, while saving
> > all the control signals in state (indexed by time)
> > 2. Same as before but doing the pre-buffering of the event/data streams
> > before the CoProcessFunction
> > 3. Similar as before but considering both streams together by
> multiplexing
> > them into one heterogeneous stream which would be pre-sorted in order to
> > guarantee the global ordering of the events from the two different
> sources.
> > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
> > ProcessFunction[Either[IN1, IN2], OUT] which by construction will process
> > the data in order and hence produce deterministic results
> >
> > Essentially, all the strategies amount to introducing a "minimum amount
> of
> > delay" to guarantee the deterministic processing, which brings me to the
> > following question:
> >
> > * How to get an estimate for the out-of-order-ness bound that a
> > CoProcessFunction can introduce? Is that even possible in the first
> place?
> > I guess this 

Re: Clarification on checkpoint cleanup with RETAIN_ON_CANCELLATION

2022-12-12 Thread Alexis Sarda-Espinosa
Hi Hangxiang,

after some more digging, I think the job ID is maintained not because of
Flink HA, but because of the Kubernetes operator. It seems to me that
"savepoint" upgrade mode should ideally alter job ID when starting from the
savepoint, but I'm not sure.

Regards,
Alexis.

Am Mo., 12. Dez. 2022 um 10:31 Uhr schrieb Hangxiang Yu :

> Hi Alexis.
> IIUC, by default, the job id of the new job should be different if you
> restore from a stopped job ? Whether to cleanup is related to the savepoint
> restore mode.
> Just in the case of failover, the job id should not change, and everything
> in the checkpoint dir will be claimed as you said.
>
> > And a related question for a slightly different scenario, if I
> use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
> stop-job-with-savepoint, does that trigger checkpoint deletion?
> In this case, the checkpoint will be cleaned and not retained and the
> savepoint will remain. So you still could use savepoint to restore.
>
> On Mon, Dec 5, 2022 at 6:33 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a doubt about a very particular scenario with this configuration:
>>
>> - Flink HA enabled (Kubernetes).
>> - ExternalizedCheckpointCleanup set to RETAIN_ON_CANCELLATION.
>> - Savepoint restore mode left as default NO_CLAIM.
>>
>> During an upgrade, a stop-job-with-savepoint is triggered, and then that
>> savepoint is used to start the upgraded job. Based on what I see, since HA
>> is enabled, the job ID doesn't change. Additionally, I understand the first
>> checkpoint after restoration will be a full one so that there's no
>> dependency on the used savepoint. However, since the job ID didn't change,
>> the new checkpoint still shares a path with "older" checkpoints, e.g.
>> /.../job_id/chk-1234.
>>
>> In this case, does this mean everything under /.../job_id/ *except*
>> shared/, taskowned/, and any chk-*/ folder whose id is smaller than 1234
>> could be deleted? I imagine even some files under shared/ could be deleted
>> as well, although that might be harder to identify.
>>
>> And a related question for a slightly different scenario, if I
>> use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
>> stop-job-with-savepoint, does that trigger checkpoint deletion?
>>
>> Regards,
>> Alexis.
>>
>
>
> --
> Best,
> Hangxiang.
>


  1   2   >