Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-13 Thread Arvid Heise
Hi Agnelo,

How is the writer schema encoded if you are using no schema registry? Or
phrased differently: how does Flink know with which schema the data has
been written so that it can map it to the new schema?

On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta 
wrote:

> Hi, we are using Flink SQL 1.12 and have a couple of tables created from
> kafka topics. Format is avro (not confluent avro) and no schema registry as
> such.
>
> In flink 1.11 we used to specify the schema, however in 1.12 the schema is
> derived from the message itself.
>
> Is it possible for the producers to start sending new fields without
> changes in the flink app?
>
>
>
> For example :
>
> {
>
>   "name": "topic1",
>
>   "type": "record",
>
>   "fields": [
>
>   {
>
>   "name": "field1",
>
>   "type": "string"
>
> },
>
> {
>
>   "name": "field2",
>
>   "type": "string"
>
> },
>
> {
>
>   *"name": "field3",*
>
> *  "type": "string"*
>
> },
>
> ]
>
> }
>
>
>
> Flink table has:
>
> CREATE TABLE topic1(\n"
>
> + " field1 string not null \n"
>
> + " ,field2 string not null \n"
>
> "'connector' = 'kafka' \n"
>
>  + ",'topic' = 'topic1' \n"
>
>  + ",'scan.startup.mode' = 'latest-offset' \n"
>
>  + ",'properties.group.id' = 'topic1' \n"
>
>  + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>
>   + ",'properties.enable.auto.commit' = 'true' \n"
>
>  + ",'format' = 'avro' \n";
>
>
>
> With above settings I get a deserialization error:
>
>
>
> *java.io.IOException: Failed to deserialize Avro record.*
>
> *at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> ~[flink-core-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>


In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-13 Thread Agnelo Dcosta
Hi, we are using Flink SQL 1.12 and have a couple of tables created from
kafka topics. Format is avro (not confluent avro) and no schema registry as
such.

In flink 1.11 we used to specify the schema, however in 1.12 the schema is
derived from the message itself.

Is it possible for the producers to start sending new fields without
changes in the flink app?



For example :

{

  "name": "topic1",

  "type": "record",

  "fields": [

  {

  "name": "field1",

  "type": "string"

},

{

  "name": "field2",

  "type": "string"

},

{

  *"name": "field3",*

*  "type": "string"*

},

]

}



Flink table has:

CREATE TABLE topic1(\n"

+ " field1 string not null \n"

+ " ,field2 string not null \n"

"'connector' = 'kafka' \n"

 + ",'topic' = 'topic1' \n"

 + ",'scan.startup.mode' = 'latest-offset' \n"

 + ",'properties.group.id' = 'topic1' \n"

 + ",'properties.bootstrap.servers' = 'localhost:8082' \n"

  + ",'properties.enable.auto.commit' = 'true' \n"

 + ",'format' = 'avro' \n";



With above settings I get a deserialization error:



*java.io.IOException: Failed to deserialize Avro record.*

*at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
~[flink-sql-avro-1.12.0.jar:1.12.0]*

*at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
~[flink-sql-avro-1.12.0.jar:1.12.0]*

*at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
~[flink-core-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*


Re: NPE when aggregate window.

2021-04-13 Thread Si-li Liu
Thanks for your help.

After I replaced com.google.common.base.Objects.hashCode with
toString().hashCode(), the NPE problem is solved.

Arvid Heise  于2021年4月13日周二 下午11:40写道:

> To second Dawids question: are all fields final or is it possible that
> their values are changing?
>
> On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu  wrote:
>
>> Hi,Dawid,
>>
>> Thanks for your help. I use com.google.common.base.Objects.hashCode, pass
>> all fields to it and generate a hashcode, and the equal method also compare
>> all the fields.
>>
>> Dawid Wysakowicz  于2021年4月13日周二 下午8:10写道:
>>
>>> Hi,
>>>
>>> Could you check that your grouping key has a stable hashcode and equals?
>>> It is very likely caused by an unstable hashcode and that a record with an
>>> incorrect key ends up on a wrong task manager.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 13/04/2021 08:47, Si-li Liu wrote:
>>>
>>> Hi,
>>>
>>> I encounter a weird NPE when try to do aggregate on a fixed window. If I
>>> set a small parallism number the whole job uses only one TaskManager, this
>>> NPE will not happen. But when the job scales to two TaskManagers, the
>>> TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
>>>
>>> The NPE exception stack is:
>>>
>>> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task
>>> [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
>>> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink:
>>> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING
>>> to FAILED.
>>> java.io.IOException: Exception while applying AggregateFunction in
>>> aggregating state
>>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>>> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11
>>> -1.11.1.jar:1.11.1]
>>> at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.
>>> 1.jar:1.11.1]
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:
>>> 1.11.1]
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1
>>> .jar:1.11.1]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:
>>> 1.11.1]
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:
>>> 1.11.1]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1
>>> ]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>>> Caused by: java.lang.NullPointerException
>>> at org.apache.flink.runtime.state.heap.StateTable.transform(
>>> StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>>> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>> ... 13 more
>>> My aggregate code is
>>>
>>> public class AggregateDataEntry implements 
>>> AggregateFunction, Map, 
>>> Map> {
>>>
>>> @Overridepublic Map createAccumulator() {
>>> return new HashMap<>();
>>> }
>>>
>>> @Overridepublic Map add(Tuple2>> DataIndex> value, Map accumulator) {
>>> accumulator.merge(value.f0, value.f1, DataIndex::add);
>>> return accumulator;
>>> }
>>>
>>> @Overridepublic Map getResult(Map>> DataIndex> accumulator) {
>>> return accumulator;
>>> }
>>>
>>> @Overridepublic Map merge(Map>> DataIndex> a, Map b) {
>>> a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
>>> DataIndex::add));
>>> return b;
>>> }
>>> }
>>>
>>> Could anyone know something about this NPE, thanks!
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu


Re: JSON source for pyflink stream

2021-04-13 Thread Yik San Chan
Hi Giacomo,

I think you can try using Flink SQL connector. For JSON input such as {"a":
1, "b": {"c": 2, {"d": 3}}}, you can do:

CREATE TABLE data (
  a INT,
  b ROW>
) WITH (...)

Let me know if that helps.

Best,
Yik San

On Wed, Apr 14, 2021 at 2:00 AM  wrote:

> Hi,
> I'm new to Flink and I am trying to create a stream from locally
> downloaded tweets. The tweets are in json format, like in this example:
>
> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC
> 
> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
> for Android","lang":"in"},
> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
> Pareda","created_at":"2021-03-05T14:07:56.000Z",
>
> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
>
> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>
> I would like to do it in Python using Pyflink, but could also use Java if
> there is no reasonable way to do it in Python. I've been looking at
> different options for loading these objects into a stream, but am not sure
> what to do. Here's my situation so far:
>
> 1. There doesn't seem to be a fitting connector. The filesystem-connector
> doesn't seem to support json format.
> 2. I've seen in the archive of this mailing list that some reccomend to
> use the Table API. But I am not sure if this is a viable option given how
> nested the json objects are.
> 3. I could of course try to implement a custom DataSource, but that seems
> to be quite difficult so I'd only consider this if there's no other way.
>
> I'll be very grateful for any kind of input.
> Cheers,
> Giacomo
>
>


Re: Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Xintong Song
These metrics should also be available via REST.

You can check the original design doc [1] for which metrics the UI is using.

Thank you~

Xintong Song


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager

On Tue, Apr 13, 2021 at 9:08 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hi Xintong,
>
>
>
> Thanks for the info. Is there any way to access these metrics outside of
> the UI? I suppose Flink’s reporters might provide them, but will they also
> be available through the REST interface (or another interface)?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Xintong Song 
> *Sent:* Tuesday, 13 April 2021 14:30
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Clarification about Flink's managed memory and metric
> monitoring
>
>
>
> Hi Alexis,
>
>
>
> First of all, I strongly recommend not to look into the JVM metrics. These
> metrics are fetched directly from JVM and do not well correspond to Flink's
> memory configurations. They were introduced a long time ago and are
> preserved mostly for compatibility. IMO, they bring more confusion than
> convenience. In Flink-1.12, there is a newly designed TM metrics page in
> the web ui, which clearly shows how the metrics correspond to Flink's
> memory configurations (if any).
>
>
>
> Concerning your questions.
>
> 1. Yes, increasing framework/task off-heap memory sizes should increase
> the direct memory capacity. Increasing the network memory size should also
> do that.
>
> 2. When 'state.backend.rocksdb.memory.managed' is true, RocksDB uses
> managed memory. Managed memory is not measured by any JVM metrics. It's not
> managed by JVM, meaning that it's not limited by '-XX:MaxDirectMemorySize'
> and is not controlled by the garbage collectors.
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Tue, Apr 13, 2021 at 7:53 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a Flink TM configured with taskmanager.memory.managed.size: 1372m.
> There is a streaming job using RocksDB for checkpoints, so I assume some of
> this memory will indeed be used.
>
>
>
> I was looking at the metrics exposed through the REST interface, and I
> queried some of them:
>
>
>
> /taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
> | jq
>
> [
>
>   {
>
> "id": "Status.JVM.Memory.Heap.Committed",
>
> "value": "1652031488"
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.NonHeap.Committed",
>
> "value": "234291200"
>  223 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.MemoryUsed",
>
> "value": "375015427"
> 358 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.TotalCapacity",
>
> "value": "375063552"
> 358 MiB
>
>   }
>
> ]
>
>
>
> I presume direct memory is being used by Flink and its networking stack,
> as well as by the JVM itself. To be sure:
>
>
>
>1. Increasing "taskmanager.memory.framework.off-heap.size" or
>"taskmanager.memory.task.off-heap.size" should increase
>Status.JVM.Memory.Direct.TotalCapacity, right?
>2. I presume the native memory used by RocksDB cannot be tracked with
>these JVM metrics even if "state.backend.rocksdb.memory.managed" is true,
>right?
>
>
>
> Based on this question:
> https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
> I imagine Flink/RocksDB either allocates memory completely independently of
> the JVM, or it uses unsafe. Since the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
> states that "Managed memory is managed by Flink and is allocated as native
> memory (off-heap)", I thought this native memory might show up as part of
> direct memory tracking, but I guess it doesn’t.
>
>
>
> Regards,
>
> Alexis.
>
>
>
>


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-13 Thread Fuyao Li
Hello Yang,

I also created a PR for this issue. Please take a look.
Refer to https://github.com/apache/flink/pull/15602

Thanks,
Fuyao

From: Fuyao Li 
Date: Tuesday, April 13, 2021 at 18:23
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
Hello Yang,

I tried to create a ticket 
https://issues.apache.org/jira/browse/FLINK-22264
I just registered as a user and I can’t find a place to assign the task to 
myself… Any idea on this jira issue?

Thanks.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 13, 2021 at 03:01
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
I think it makes sense to have such a simple fix.

Could you please create a ticket and attach a PR?

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
下午2:24写道:
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li mailto:fuyao...@oracle.com>>
Cc: user mailto:user@flink.apache.org>>, Yan Wang 
mailto:y.yan.w.w...@oracle.com>>
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
[4] 
https://www.youtube.com/watch?v=pdFPr_VOWTU&t=833s

Best,
Fuyao


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-13 Thread Fuyao Li
Hello Yang,

I tried to create a ticket https://issues.apache.org/jira/browse/FLINK-22264
I just registered as a user and I can’t find a place to assign the task to 
myself… Any idea on this jira issue?

Thanks.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 13, 2021 at 03:01
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
I think it makes sense to have such a simple fix.

Could you please create a ticket and attach a PR?

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
下午2:24写道:
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li mailto:fuyao...@oracle.com>>
Cc: user mailto:user@flink.apache.org>>, Yan Wang 
mailto:y.yan.w.w...@oracle.com>>
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
[4] 
https://www.youtube.com/watch?v=pdFPr_VOWTU&t=833s

Best,
Fuyao


Extract/Interpret embedded byte data from a record

2021-04-13 Thread Sumeet Malhotra
Hi,

I'm reading data from Kafka, which is Avro encoded and has the following
general schema:

{
  "name": "SomeName",
  "doc": "Avro schema with variable embedded encodings",
  "type": "record",
  "fields": [
{
  "name": "Name",
  "doc": "My name",
  "type": "string"
},
{
  "name": "ID",
  "doc": "My ID",
  "type": "string"
},
{
  "name": "Result",
  "doc": "Result data, could be encoded differently",
  "type": "bytes"
},
{
  "name": "ResultEncoding",
  "doc": "Result encoding media type (e.g. application/avro,
application/json)",
  "type": "string"
},
  ]
}

Basically, the "Result" field is bytes whose interpretation depends upon
the "ResultEncoding" field i.e. either avro or json. The "Result" byte
stream has its own well defined schema also.

My use case involves extracting/aggregating data from within the embedded
"Result" field. What would be the best approach to perform this runtime
decoding and extraction of fields from the embedded byte data? Would user
defined functions help in this case?

Thanks in advance!
Sumeet


Re: Avro schema

2021-04-13 Thread Sumeet Malhotra
Hi Arvid,

I certainly appreciate the points you make regarding schema evolution.
Actually, I did end up writing an avro2sql script to autogen the DDL in the
end.

Thanks,
Sumeet

On Fri, Apr 9, 2021 at 12:13 PM Arvid Heise  wrote:

> Hi Sumeet,
>
> The beauty of Avro lies in having reader and writer schema and schema
> compatibility, such that if your schema evolves over time (which will
> happen in streaming naturally but is also very common in batch), you can
> still use your application as is without modification. For streaming, this
> methodology also implies that you can process elements with different
> schema versions in the same run, which is mandatory for any non-toy example.
>
> If you read into this topic, you will realize that it doesn't make sense
> to read from Avro without specifying your reader schema (except for some
> generic applications, but they should be written in DataStream). If you
> keep in mind that your same dataset could have different schemas, you will
> notice that your ideas quickly reach some limitations (which schema to
> take?). What you could do, is to write a small script to generate the
> schema DDL from your current schema in your actual data if you have very
> many columns and datasets. It certainly would also be an interesting idea
> to pass a static Avro/Json schema to the DDL.
>
> On Fri, Apr 2, 2021 at 10:57 AM Paul Lam  wrote:
>
>> Hi Sumeet,
>>
>> I’m not a Table/SQL API expert, but from my knowledge, it’s not viable to
>> derived SQL table schemas from Avro schemas, because table schemas would be
>> the ground truth by design.
>> Moreover, one Avro type can be mapped to multiple Flink types, so in
>> practice maybe it’s also not viable.
>>
>> Best,
>> Paul Lam
>>
>> 2021年4月2日 11:34,Sumeet Malhotra  写道:
>>
>> Just realized, my question was probably not clear enough. :-)
>>
>> I understand that the Avro (or JSON for that matter) format can be
>> ingested as described here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#apache-avro-format,
>> but this still requires the entire table specification to be written in the
>> "CREATE TABLE" section. Is it possible to just specify the Avro schema and
>> let Flink map it to an SQL table?
>>
>> BTW, the above link is titled "Table API Legacy Connectors", so is this
>> still supported? Same question for YAML specification.
>>
>> Thanks,
>> Sumeet
>>
>> On Fri, Apr 2, 2021 at 8:26 AM Sumeet Malhotra 
>> wrote:
>>
>>> Hi,
>>>
>>> Is it possible to directly import Avro schema while ingesting data into
>>> Flink? Or do we always have to specify the entire schema in either SQL DDL
>>> for Table API or using DataStream data types? From a code maintenance
>>> standpoint, it would be really helpful to keep one source of truth for the
>>> schema somewhere.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>
>>


Re: Flink docker 1.11.3 actually runs 1.11.2

2021-04-13 Thread Flavio Pompermaier
Hi Chesnay,
my tests were done using docker-compose (with the command 'docker-compose
up --build -d flink-jobmanager flink-taskmanager').
These are the necessary files (./flink/db-libs/* contains the jdbc
libraries I use while /opt/flink/data is used as a volume to share files
with other dockers):
PS: before I've used FLINK_VERSION=1.11-scala_2.12-java11 in the .env
file...so if you didn't encounter the problem it's probably caused by some
docker-compose internal that is not overriding the old image/container (I'm
indeed relatively new to docker and docker-compose..)
---
.env
---
FLINK_VERSION=1.11.3-scala_2.12-java11


docker-compose.yml


version: '3'
services:
  flink-jobmanager:
container_name: flink-jobmanager
build:
  context: .
  dockerfile: Dockerfile.flink
  args:
- FLINK_VERSION=${FLINK_VERSION}
image: 'flink-test:${FLINK_VERSION}'
ports:
  - "8091:8081"
  - "8092:8082"
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
historyserver.web.port: 8082
web.upload.dir: /opt/flink
volumes:
  - '/opt/flink/data:/tmp/flink'
networks:
  - test-network
  flink-taskmanager:
container_name: flink-taskmanager
build:
  context: .
  dockerfile: Dockerfile.flink
  args:
- FLINK_VERSION=${FLINK_VERSION}
image: 'flink-test:${FLINK_VERSION}'
depends_on:
  - flink-jobmanager
command: taskmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 2
volumes:
  - '/opt/flink/data:/tmp/flink'
networks:
  - test-network

networks:
  test-network:
driver: bridge

---
Dockerfile.flink
---

ARG FLINK_VERSION
FROM flink:$FLINK_VERSION

USER root
RUN set -ex; apt-get update; apt-get -y install openssh-client ssh # python
USER flink
WORKDIR /opt/flink

COPY flink/db-libs/*   lib/

Thanks for the support,
Flavio

On Tue, Apr 13, 2021 at 7:43 PM Chesnay Schepler  wrote:

> Please provide steps to reproduce the issue.
>
> I can't see anything wrong in the dockerfiles (they reference the
> correct release url), and the referenced release correctly identifies
> itself as 1.11.3 .
> I also started a container with the image, started a jobmanager, and the
> logs show 1.11.3 like they are supposed to do.
>
> On 4/13/2021 6:31 PM, Flavio Pompermaier wrote:
> > Hi to all,
> > I've just build a docker that use the image
> > flink:1.11.3-scala_2.12-java11 but the web UI (and logs too) display
> > Flink 1.11.2 (Commit: fe36135). Was there an error with the release?
> >
> > Best,
> > Flavio
>
>
>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-04-13 Thread Lu Niu
FYI, my teammate Chen posted a similar question: ,*Apache Flink Mailing
List archive. - handle SUSPENDED in ZooKeeperLeaderRetrievalService
.
That is the root cause of the problem.*



On Wed, Mar 31, 2021 at 2:01 PM Lu Niu  wrote:

> Hi, Colletta
>
> Thanks for sharing! Do you mind share one stacktrace for that error as
> well? Thanks!
>
> Best
> Lu
>
> On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward 
> wrote:
>
>>
>>
>> FYI, we experience a similar error again, lost leadership but not due to
>> timeout but a disconnect from zookeeper.  This time I examined logs for
>> other errors related to zookeeper and found the kafka cluster that uses the
>> same zookeeper also was disconnected.
>>
>>
>>
>> We run on AWS and this seems to be AWS related.
>>
>>
>>
>>
>>
>> *From:* Xintong Song 
>> *Sent:* Sunday, January 31, 2021 9:23 PM
>> *To:* user 
>> *Subject:* Re: Flink 1.11 job hit error "Job leader lost leadership" or
>> "ResourceManager leader changed to new address null"
>>
>>
>>
>> *This email is from an external source - **exercise caution regarding
>> links and attachments. *
>>
>>
>>
>> Hi Colletta,
>>
>>
>>
>> This error is kind of expected if the JobMaster / ResourceManager does
>> not maintain a stable connection to the ZooKeeper service, which may be
>> caused by network issues, GC pause, or unstable ZK service availability.
>>
>>
>>
>> By "similar issue", what I meant is I'm not aware of any issue related to
>> the upgrading of the ZK version that may cause the leadership loss.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>>
>>
>> On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward 
>> wrote:
>>
>> “but I'm not aware of any similar issue reported since the upgrading”
>>
>> For the record, we experienced this same error on Flink 1.11.2 this past
>> week.
>>
>>
>>
>> *From:* Xintong Song 
>> *Sent:* Friday, January 29, 2021 7:34 PM
>> *To:* user 
>> *Subject:* Re: Flink 1.11 job hit error "Job leader lost leadership" or
>> "ResourceManager leader changed to new address null"
>>
>>
>>
>> *This email is from an external source - **exercise caution regarding
>> links and attachments. *
>>
>>
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>>
>>
>> On Sat, Jan 30, 2021 at 8:27 AM Xintong Song 
>> wrote:
>>
>> There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
>> aware of any similar issue reported since the upgrading.
>>
>> I would suggest the following:
>>
>> - Turn on the DEBUG log see if there's any valuable details
>>
>> - Maybe try asking in the Apache Zookeeper community, see if this is a
>> known issue.
>>
>>
>> Thank you~
>> Xintong Song
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sat, Jan 30, 2021 at 6:47 AM Lu Niu  wrote:
>>
>> Hi, Xintong
>>
>>
>>
>> Thanks for replying. Could it relate to the zk version? We are a platform
>> team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9
>> and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced
>> in 1.11 jobs. That's why we think it is related to version upgrade.
>>
>>
>>
>> Best
>>
>> Lu
>>
>>
>>
>> On Thu, Jan 28, 2021 at 7:56 PM Xintong Song 
>> wrote:
>>
>> The ZK client side uses 15s connection timeout and 60s session timeout
>> in Flink. There's nothing similar to a heartbeat interval configured, which
>> I assume is up to ZK's internal implementation. These things have not
>> changed in FLink since at least 2017.
>>
>>
>>
>> If both ZK client and server complain about timeout, and there's no gc
>> issue spotted, I would consider a network instability.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>>
>>
>> On Fri, Jan 29, 2021 at 3:15 AM Lu Niu  wrote:
>>
>> After checking the log I found the root cause is zk client timeout on TM:
>>
>> ```
>>
>> 2021-01-25 14:01:49,600 WARN
>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>> session timed out, have not heard from server in 40020ms for sessionid
>> 0x404f9ca531a5d6f
>> 2021-01-25 14:01:49,610 INFO
>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>> session timed out, have not heard from server in 40020ms for sessionid
>> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
>> 2021-01-25 14:01:49,711 INFO
>> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
>> - State change: SUSPENDED
>> 2021-01-25 14:01:49,711 WARN
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>> ZooKeeper.
>> 2021-01-25 14:01:49,712 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
>> 27ac39342913d29baac4cde13062c4a4 with leader id
>> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
>> 2021-01-25 14:01:49,712 WARN
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetr

JSON source for pyflink stream

2021-04-13 Thread G . G . M . 5611
Hi,
I'm new to Flink and I am trying to create a stream from locally downloaded tweets. The tweets are in json format, like in this example:
 
{"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
"author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter for Android","lang":"in"},
"includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan Pareda","created_at":"2021-03-05T14:07:56.000Z",
"public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
"username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
 
I would like to do it in Python using Pyflink, but could also use Java if there is no reasonable way to do it in Python. I've been looking at different options for loading these objects into a stream, but am not sure what to do. Here's my situation so far:
 
1. There doesn't seem to be a fitting connector. The filesystem-connector doesn't seem to support json format.
2. I've seen in the archive of this mailing list that some reccomend to use the Table API. But I am not sure if this is a viable option given how nested the json objects are.
3. I could of course try to implement a custom DataSource, but that seems to be quite difficult so I'd only consider this if there's no other way.

I'll be very grateful for any kind of input.
Cheers,
Giacomo
 


Re: Flink docker 1.11.3 actually runs 1.11.2

2021-04-13 Thread Chesnay Schepler

Please provide steps to reproduce the issue.

I can't see anything wrong in the dockerfiles (they reference the 
correct release url), and the referenced release correctly identifies 
itself as 1.11.3 .
I also started a container with the image, started a jobmanager, and the 
logs show 1.11.3 like they are supposed to do.


On 4/13/2021 6:31 PM, Flavio Pompermaier wrote:

Hi to all,
I've just build a docker that use the image 
flink:1.11.3-scala_2.12-java11 but the web UI (and logs too) display 
Flink 1.11.2 (Commit: fe36135). Was there an error with the release?


Best,
Flavio





Flink docker 1.11.3 actually runs 1.11.2

2021-04-13 Thread Flavio Pompermaier
Hi to all,
I've just build a docker that use the image flink:1.11.3-scala_2.12-java11
but the web UI (and logs too) display Flink 1.11.2 (Commit: fe36135). Was
there an error with the release?

Best,
Flavio


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Arvid Heise
Hi Rahul,

Checkpointing is Flink's way of providing processing guarantees "at least
once"/"exactly once". So your question is like asking if a car offers any
safety without you wanting to use a built-in belt and airbags. Sure you
could install your own safety features but chances are that your solution
isn't working as well as the built-in stuff.
I also see little use to add a different way to Flink in general. Flink is
already complex enough; the community and newer developers suffer more from
having too many (not so well) documented options than having too little
options.

With that being said, let's see how you can build your own checkpointing.

I haven't explored this approach. Wouldn't the backpressure gets propagated
> upstream and the consumption rate from Kafka gets affected by it?

Yes, and that's what you have now as well. You just have two sources of
backpressure (window + async I/O). You can combine it into one source and
reduce state significantly. But tbh I haven't fully understood your window
operator and why it's necessary at all. It sounds a bit like artificially
slowing down your pipeline.

I am new to delegating sink. Can you please help me to understand how this
> approach helps in failures or recovery to replay records from Kafka?
> Currently, I am using auto offset commit of Kafka Consumer, which I think
> commits offsets every 5 seconds, by default.
>
Afaik auto-commit commits the offsets while fetching a new batch of records.

The delegating sink would work more like Kafka Streams; it would only
commit those records that have been written. Let's say you have read record
A, B, C and you are now writing A, you would only commit the offset of A.
On failure and recovery, you would only read B, C since they are
uncommitted.

The downside of this approach is that your throughput will always be
limited as the fine-grain commit costs you quite a bit of performance (you
need for acknowledgement, so a full TCP roundtrip + Kafka broker commit
time + Kafka broker replication time).

On Tue, Apr 13, 2021 at 5:38 PM Rahul Patwari 
wrote:

> Hi Arvid,
>
> Thanks for your inputs. They are super helpful.
>
> Why do you need the window operator at all? Couldn't you just backpressure
>> on the async I/O by delaying the processing there?
>>
>
> I haven't explored this approach. Wouldn't the backpressure gets
> propagated upstream and the consumption rate from Kafka gets affected by it?
>
> What's keeping you from attaching the offset of the Kafka records to A, B,
>> C and write the offset when writing the record into the sink? (Probably
>> need to wrap your sink function into a delegating sink function)
>>
>
> I am new to delegating sink. Can you please help me to understand how this
> approach helps in failures or recovery to replay records from Kafka?
> Currently, I am using auto offset commit of Kafka Consumer, which I think
> commits offsets every 5 seconds, by default.
>
> Checkpointing will definitely solve the problem. But, replaying records
> from Kafka seems like a simpler approach to guarantee "at least once"
> processing throughout the life of the stateless pipeline. Replaying records
> from Kafka also seems like a simpler approach operationally.
>
> Is there no other way to guarantee "at least once" processing without
> checkpointing?
>
> Checkpointing seems like is the only approach to guarantee "at least
> once"/"exactly once" processing for stateful pipelines. But support for
> replaying records to guarantee "at least once" processing would be helpful.
>
> I know that Checkpointing has "at least once" mode. Probably we can add
> one more mode where records are replayed from Source and State is not
> checkpointed. Just a Suggestion. What are your thoughts? In this case, this
> approach will be very helpful where only Kafka offsets are checkpointed.
>
> Thanks,
> Rahul
>
>
> On Tue, Apr 13, 2021 at 7:20 PM Arvid Heise  wrote:
>
>> Hi Rahul,
>>
>> This pipeline should process millions of records per day with low
>>> latency.
>>> I am avoiding Checkpointing, as the records in the Window operator and
>>> in-flight records in the Async I/O operator are persisted along with the
>>> Kafka offsets. But the records in Window and Async I/O operators can be
>>> obtained just by replaying the records from Kafka Source, which is the
>>> approach I want to take.
>>> There is Deduplication logic in the pipeline. So, I am preferring to
>>> replay records in case of failures rather than storing the incremental
>>> snapshots.
>>>
>> Did you measure how much data is being checkpointed? A few million
>> records per day should just be a few megabytes per checkpoint.
>> It sounds to me as if you would rather want your approach because it's
>> conceptually more sound but not because it is necessary.
>>
>> I'm asking because delaying records and using async I/O is increasing
>> your latency significantly anyways. So a couple of additional ms to the
>> checkpoint don't sound like it will invalidate your use case

Re: NPE when aggregate window.

2021-04-13 Thread Arvid Heise
To second Dawids question: are all fields final or is it possible that
their values are changing?

On Tue, Apr 13, 2021 at 4:41 PM Si-li Liu  wrote:

> Hi,Dawid,
>
> Thanks for your help. I use com.google.common.base.Objects.hashCode, pass
> all fields to it and generate a hashcode, and the equal method also compare
> all the fields.
>
> Dawid Wysakowicz  于2021年4月13日周二 下午8:10写道:
>
>> Hi,
>>
>> Could you check that your grouping key has a stable hashcode and equals?
>> It is very likely caused by an unstable hashcode and that a record with an
>> incorrect key ends up on a wrong task manager.
>>
>> Best,
>>
>> Dawid
>> On 13/04/2021 08:47, Si-li Liu wrote:
>>
>> Hi,
>>
>> I encounter a weird NPE when try to do aggregate on a fixed window. If I
>> set a small parallism number the whole job uses only one TaskManager, this
>> NPE will not happen. But when the job scales to two TaskManagers, the
>> TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
>>
>> The NPE exception stack is:
>>
>> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task
>> [] - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
>> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink:
>> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING
>> to FAILED.
>> java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-
>> 1.11.1.jar:1.11.1]
>> at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1
>> .jar:1.11.1]
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:
>> 1.11.1]
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1
>> .jar:1.11.1]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:
>> 1.11.1]
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:
>> 1.11.1]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
>> Caused by: java.lang.NullPointerException
>> at org.apache.flink.runtime.state.heap.StateTable.transform(
>> StateTable.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
>> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> ... 13 more
>> My aggregate code is
>>
>> public class AggregateDataEntry implements AggregateFunction> DataIndex>, Map, Map> {
>>
>> @Overridepublic Map createAccumulator() {
>> return new HashMap<>();
>> }
>>
>> @Overridepublic Map add(Tuple2> DataIndex> value, Map accumulator) {
>> accumulator.merge(value.f0, value.f1, DataIndex::add);
>> return accumulator;
>> }
>>
>> @Overridepublic Map getResult(Map> DataIndex> accumulator) {
>> return accumulator;
>> }
>>
>> @Overridepublic Map merge(Map> DataIndex> a, Map b) {
>> a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
>> DataIndex::add));
>> return b;
>> }
>> }
>>
>> Could anyone know something about this NPE, thanks!
>> --
>> Best regards
>>
>> Sili Liu
>>
>>
>
> --
> Best regards
>
> Sili Liu
>


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Rahul Patwari
Hi Arvid,

Thanks for your inputs. They are super helpful.

Why do you need the window operator at all? Couldn't you just backpressure
> on the async I/O by delaying the processing there?
>

I haven't explored this approach. Wouldn't the backpressure gets propagated
upstream and the consumption rate from Kafka gets affected by it?

What's keeping you from attaching the offset of the Kafka records to A, B,
> C and write the offset when writing the record into the sink? (Probably
> need to wrap your sink function into a delegating sink function)
>

I am new to delegating sink. Can you please help me to understand how this
approach helps in failures or recovery to replay records from Kafka?
Currently, I am using auto offset commit of Kafka Consumer, which I think
commits offsets every 5 seconds, by default.

Checkpointing will definitely solve the problem. But, replaying records
from Kafka seems like a simpler approach to guarantee "at least once"
processing throughout the life of the stateless pipeline. Replaying records
from Kafka also seems like a simpler approach operationally.

Is there no other way to guarantee "at least once" processing without
checkpointing?

Checkpointing seems like is the only approach to guarantee "at least
once"/"exactly once" processing for stateful pipelines. But support for
replaying records to guarantee "at least once" processing would be helpful.

I know that Checkpointing has "at least once" mode. Probably we can add one
more mode where records are replayed from Source and State is not
checkpointed. Just a Suggestion. What are your thoughts? In this case, this
approach will be very helpful where only Kafka offsets are checkpointed.

Thanks,
Rahul


On Tue, Apr 13, 2021 at 7:20 PM Arvid Heise  wrote:

> Hi Rahul,
>
> This pipeline should process millions of records per day with low latency.
>> I am avoiding Checkpointing, as the records in the Window operator and
>> in-flight records in the Async I/O operator are persisted along with the
>> Kafka offsets. But the records in Window and Async I/O operators can be
>> obtained just by replaying the records from Kafka Source, which is the
>> approach I want to take.
>> There is Deduplication logic in the pipeline. So, I am preferring to
>> replay records in case of failures rather than storing the incremental
>> snapshots.
>>
> Did you measure how much data is being checkpointed? A few million records
> per day should just be a few megabytes per checkpoint.
> It sounds to me as if you would rather want your approach because it's
> conceptually more sound but not because it is necessary.
>
> I'm asking because delaying records and using async I/O is increasing your
> latency significantly anyways. So a couple of additional ms to the
> checkpoint don't sound like it will invalidate your use case to me. It will
> also be cheaper if you factor in your work time and will also work if you
> ever extend your pipeline to hold state.
> Recovery should also not be worse because you restore the records from
> blob storage instead of fetching it from the source system.
>
> Also let me repeat these questions
>
>> 1. Why do you need the window operator at all? Couldn't you just
>> backpressure on the async I/O by delaying the processing there?
>>
> 2. What's keeping you from attaching the offset of the Kafka records to A,
>> B, C and write the offset when writing the record into the sink? (Probably
>> need to wrap your sink function into a delegating sink function)
>>
>
>
>
>
> On Tue, Apr 13, 2021 at 12:33 PM Rahul Patwari 
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the reply.
>>
>> could you please help me to understand how the at least once guarantee
>>> would work without checkpointing in your case?
>>>
>>
>> This was the plan to maintain "at least once" guarantee:
>> Logic at Sink:
>> The DataStream on which Sink Function is applied, on the same DataStream,
>> apply a widowing operator and compute min of the timestamps of records and
>> persist the timestamp in Cassandra -> This will persist the record
>> timestamp below which all the records are processed, say, every 10 seconds.
>> (The delay created by the Windowing Operator used here makes sure that the
>> timestamp is persisted in Cassandra only after it is written to Sink)
>> Note: There is another Windowing Operator at the Source to delay the
>> processing of records.
>>
>> Logic at Source:
>> While creating the JobGraph, read the timestamp persisted in Cassandra
>> for each topic and configure the start position of Kafka Consumer.
>>
>> The problem is that the start positions are not respected when there are
>> Automatic restarts during failures. Basically, we wanted to read the
>> timestamp from Cassandra and start consuming from the timestamp even in
>> case of Automatic restarts during failures.
>>
>> This pipeline should process millions of records per day with low
>> latency.
>> I am avoiding Checkpointing, as the records in the Window operator and
>> in-flight records in the Asy

Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-13 Thread Klemens Muthmann
Hi,

I've just tried to run the basic example for Apache Flink 

 on an Apple Mac Pro with the new M1 Processor. I only need this for 
development purposes. The actual thing is going to run on a Linux server later 
on, so I would not mind if it only runs using the Rosetta compatibility layer. 
Unfortunately it failed with the following Stack Trace:
flink-1.12.2 ./bin/flink run examples/streaming/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/Users/muthmann/Development/Flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar)
 to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job 'Streaming WordCount'.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'Streaming WordCount'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 8 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
at 
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
at 
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelInactive(RestClient.java:588)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.c

Re: NPE when aggregate window.

2021-04-13 Thread Si-li Liu
Hi,Dawid,

Thanks for your help. I use com.google.common.base.Objects.hashCode, pass
all fields to it and generate a hashcode, and the equal method also compare
all the fields.

Dawid Wysakowicz  于2021年4月13日周二 下午8:10写道:

> Hi,
>
> Could you check that your grouping key has a stable hashcode and equals?
> It is very likely caused by an unstable hashcode and that a record with an
> incorrect key ends up on a wrong task manager.
>
> Best,
>
> Dawid
> On 13/04/2021 08:47, Si-li Liu wrote:
>
> Hi,
>
> I encounter a weird NPE when try to do aggregate on a fixed window. If I
> set a small parallism number the whole job uses only one TaskManager, this
> NPE will not happen. But when the job scales to two TaskManagers, the
> TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
>
> The NPE exception stack is:
>
> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task []
> - Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
> AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink:
> Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING
> to FAILED.
> java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1
> .jar:1.11.1]
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1
> .jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
> Caused by: java.lang.NullPointerException
> at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable
> .java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> ... 13 more
> My aggregate code is
>
> public class AggregateDataEntry implements AggregateFunction DataIndex>, Map, Map> {
>
> @Overridepublic Map createAccumulator() {
> return new HashMap<>();
> }
>
> @Overridepublic Map add(Tuple2 DataIndex> value, Map accumulator) {
> accumulator.merge(value.f0, value.f1, DataIndex::add);
> return accumulator;
> }
>
> @Overridepublic Map getResult(Map DataIndex> accumulator) {
> return accumulator;
> }
>
> @Overridepublic Map merge(Map 
> a, Map b) {
> a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
> DataIndex::add));
> return b;
> }
> }
>
> Could anyone know something about this NPE, thanks!
> --
> Best regards
>
> Sili Liu
>
>

-- 
Best regards

Sili Liu


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Arvid Heise
Hi Rahul,

This pipeline should process millions of records per day with low latency.
> I am avoiding Checkpointing, as the records in the Window operator and
> in-flight records in the Async I/O operator are persisted along with the
> Kafka offsets. But the records in Window and Async I/O operators can be
> obtained just by replaying the records from Kafka Source, which is the
> approach I want to take.
> There is Deduplication logic in the pipeline. So, I am preferring to
> replay records in case of failures rather than storing the incremental
> snapshots.
>
Did you measure how much data is being checkpointed? A few million records
per day should just be a few megabytes per checkpoint.
It sounds to me as if you would rather want your approach because it's
conceptually more sound but not because it is necessary.

I'm asking because delaying records and using async I/O is increasing your
latency significantly anyways. So a couple of additional ms to the
checkpoint don't sound like it will invalidate your use case to me. It will
also be cheaper if you factor in your work time and will also work if you
ever extend your pipeline to hold state.
Recovery should also not be worse because you restore the records from blob
storage instead of fetching it from the source system.

Also let me repeat these questions

> 1. Why do you need the window operator at all? Couldn't you just
> backpressure on the async I/O by delaying the processing there?
>
2. What's keeping you from attaching the offset of the Kafka records to A,
> B, C and write the offset when writing the record into the sink? (Probably
> need to wrap your sink function into a delegating sink function)
>




On Tue, Apr 13, 2021 at 12:33 PM Rahul Patwari 
wrote:

> Hi Arvid,
>
> Thanks for the reply.
>
> could you please help me to understand how the at least once guarantee
>> would work without checkpointing in your case?
>>
>
> This was the plan to maintain "at least once" guarantee:
> Logic at Sink:
> The DataStream on which Sink Function is applied, on the same DataStream,
> apply a widowing operator and compute min of the timestamps of records and
> persist the timestamp in Cassandra -> This will persist the record
> timestamp below which all the records are processed, say, every 10 seconds.
> (The delay created by the Windowing Operator used here makes sure that the
> timestamp is persisted in Cassandra only after it is written to Sink)
> Note: There is another Windowing Operator at the Source to delay the
> processing of records.
>
> Logic at Source:
> While creating the JobGraph, read the timestamp persisted in Cassandra for
> each topic and configure the start position of Kafka Consumer.
>
> The problem is that the start positions are not respected when there are
> Automatic restarts during failures. Basically, we wanted to read the
> timestamp from Cassandra and start consuming from the timestamp even in
> case of Automatic restarts during failures.
>
> This pipeline should process millions of records per day with low latency.
> I am avoiding Checkpointing, as the records in the Window operator and
> in-flight records in the Async I/O operator are persisted along with the
> Kafka offsets. But the records in Window and Async I/O operators can be
> obtained just by replaying the records from Kafka Source, which is the
> approach I want to take.
> There is Deduplication logic in the pipeline. So, I am preferring to
> replay records in case of failures rather than storing the incremental
> snapshots.
>
> Thanks,
> Rahul
>
> On Tue, Apr 13, 2021 at 2:53 PM Arvid Heise  wrote:
>
>> Hi Rahul,
>>
>> could you please help me to understand how the at least once guarantee
>> would work without checkpointing in your case?
>>
>> Let's say you read records A, B, C. You use a window to delay processing,
>> so let's say A passes and B, C are still in the window for the trigger.
>>
>> Now do you want to auto commit the offset of A after it being written in
>> the sink? If so, what's keeping you from attaching the offset of the Kafka
>> records to A, B, C and write the offset when writing the record into the
>> sink? (Probably need to wrap your sink function into a delegating sink
>> function)
>>
>> The way, Flink does the checkpointing is that it checkpoints the offset
>> of C, and the state of the window (containing B, C) to avoid data loss. Why
>> is that not working for you? Which state size do you expect?
>>
>> Why do you need the window operator at all? Couldn't you just
>> backpressure on the async I/O by delaying the processing there? Then there
>> would be no need to change anything.
>>
>> On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan 
>> wrote:
>>
>>> Hi Rahul,
>>>
>>> Right. There are no workarounds as far as I know.
>>>
>>> Regards,
>>> Roman
>>>
>>> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>>>  wrote:
>>> >
>>> > Hi Roman, Arvid,
>>> >
>>> > So, to achieve "at least once" guarantee, currently, automatic restart
>>> of Flink should be disable

RE: Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Alexis Sarda-Espinosa
Hi Xintong,

Thanks for the info. Is there any way to access these metrics outside of the 
UI? I suppose Flink’s reporters might provide them, but will they also be 
available through the REST interface (or another interface)?

Regards,
Alexis.

From: Xintong Song 
Sent: Tuesday, 13 April 2021 14:30
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Clarification about Flink's managed memory and metric monitoring

Hi Alexis,

First of all, I strongly recommend not to look into the JVM metrics. These 
metrics are fetched directly from JVM and do not well correspond to Flink's 
memory configurations. They were introduced a long time ago and are preserved 
mostly for compatibility. IMO, they bring more confusion than convenience. In 
Flink-1.12, there is a newly designed TM metrics page in the web ui, which 
clearly shows how the metrics correspond to Flink's memory configurations (if 
any).

Concerning your questions.
1. Yes, increasing framework/task off-heap memory sizes should increase the 
direct memory capacity. Increasing the network memory size should also do that.
2. When 'state.backend.rocksdb.memory.managed' is true, RocksDB uses managed 
memory. Managed memory is not measured by any JVM metrics. It's not managed by 
JVM, meaning that it's not limited by '-XX:MaxDirectMemorySize' and is not 
controlled by the garbage collectors.


Thank you~

Xintong Song


On Tue, Apr 13, 2021 at 7:53 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a Flink TM configured with taskmanager.memory.managed.size: 1372m. There 
is a streaming job using RocksDB for checkpoints, so I assume some of this 
memory will indeed be used.

I was looking at the metrics exposed through the REST interface, and I queried 
some of them:

/taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
 | jq
[
  {
"id": "Status.JVM.Memory.Heap.Committed",
"value": "1652031488"
  },
  {
"id": "Status.JVM.Memory.NonHeap.Committed",
"value": "234291200"
 223 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.MemoryUsed",
"value": "375015427"
358 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.TotalCapacity",
"value": "375063552"
358 MiB
  }
]

I presume direct memory is being used by Flink and its networking stack, as 
well as by the JVM itself. To be sure:


  1.  Increasing "taskmanager.memory.framework.off-heap.size" or 
"taskmanager.memory.task.off-heap.size" should increase 
Status.JVM.Memory.Direct.TotalCapacity, right?
  2.  I presume the native memory used by RocksDB cannot be tracked with these 
JVM metrics even if "state.backend.rocksdb.memory.managed" is true, right?

Based on this question: 
https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
 I imagine Flink/RocksDB either allocates memory completely independently of 
the JVM, or it uses unsafe. Since the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
 states that "Managed memory is managed by Flink and is allocated as native 
memory (off-heap)", I thought this native memory might show up as part of 
direct memory tracking, but I guess it doesn’t.

Regards,
Alexis.



Re: Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Xintong Song
Hi Alexis,

First of all, I strongly recommend not to look into the JVM metrics. These
metrics are fetched directly from JVM and do not well correspond to Flink's
memory configurations. They were introduced a long time ago and are
preserved mostly for compatibility. IMO, they bring more confusion than
convenience. In Flink-1.12, there is a newly designed TM metrics page in
the web ui, which clearly shows how the metrics correspond to Flink's
memory configurations (if any).

Concerning your questions.
1. Yes, increasing framework/task off-heap memory sizes should increase the
direct memory capacity. Increasing the network memory size should also do
that.
2. When 'state.backend.rocksdb.memory.managed' is true, RocksDB uses
managed memory. Managed memory is not measured by any JVM metrics. It's not
managed by JVM, meaning that it's not limited by '-XX:MaxDirectMemorySize'
and is not controlled by the garbage collectors.

Thank you~

Xintong Song



On Tue, Apr 13, 2021 at 7:53 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hello,
>
>
>
> I have a Flink TM configured with taskmanager.memory.managed.size: 1372m.
> There is a streaming job using RocksDB for checkpoints, so I assume some of
> this memory will indeed be used.
>
>
>
> I was looking at the metrics exposed through the REST interface, and I
> queried some of them:
>
>
>
> /taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
> | jq
>
> [
>
>   {
>
> "id": "Status.JVM.Memory.Heap.Committed",
>
> "value": "1652031488"
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.NonHeap.Committed",
>
> "value": "234291200"
>  223 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.MemoryUsed",
>
> "value": "375015427"
> 358 MiB
>
>   },
>
>   {
>
> "id": "Status.JVM.Memory.Direct.TotalCapacity",
>
> "value": "375063552"
> 358 MiB
>
>   }
>
> ]
>
>
>
> I presume direct memory is being used by Flink and its networking stack,
> as well as by the JVM itself. To be sure:
>
>
>
>1. Increasing "taskmanager.memory.framework.off-heap.size" or
>"taskmanager.memory.task.off-heap.size" should increase
>Status.JVM.Memory.Direct.TotalCapacity, right?
>2. I presume the native memory used by RocksDB cannot be tracked with
>these JVM metrics even if "state.backend.rocksdb.memory.managed" is true,
>right?
>
>
>
> Based on this question:
> https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
> I imagine Flink/RocksDB either allocates memory completely independently of
> the JVM, or it uses unsafe. Since the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
> states that "Managed memory is managed by Flink and is allocated as native
> memory (off-heap)", I thought this native memory might show up as part of
> direct memory tracking, but I guess it doesn’t.
>
>
>
> Regards,
>
> Alexis.
>
>
>


Re: Python Integration with Ververica Platform

2021-04-13 Thread Dawid Wysakowicz
I'd recommend reaching out directly to Ververica. Ververica platform is
not part of the open-source Apache Flink project.

I can connect you with Konstantin who I am sure will be happy to answer
your question ;)

Best,

Dawid

On 12/04/2021 15:40, Robert Cullen wrote:
> I've been using the Community Edition v2.4.  Just wondering if there
> is a python integration coming in future versions.
>
> tnanks
>
> -- 
> Robert Cullen
> 240-475-4490


OpenPGP_signature
Description: OpenPGP digital signature


Re: NPE when aggregate window.

2021-04-13 Thread Dawid Wysakowicz
Hi,

Could you check that your grouping key has a stable hashcode and equals?
It is very likely caused by an unstable hashcode and that a record with
an incorrect key ends up on a wrong task manager.

Best,

Dawid

On 13/04/2021 08:47, Si-li Liu wrote:
> Hi, 
>
> I encounter a weird NPE when try to do aggregate on a fixed window. If
> I set a small parallism number the whole job uses only one
> TaskManager, this NPE will not happen. But when the job scales to two
> TaskManagers, the TaskManager will crash at Create stage. The Flink
> version I use is 1.11.1.
>
> The NPE exception stack is:
>
> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Window(TumblingProcessingTimeWindows(5000),
> ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction)
> -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f)
> switched from RUNNING to FAILED.
> java.io .IOException: Exception while applying
> AggregateFunction in aggregating state
> at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> ... 13 more
> My aggregate code is
> public class AggregateDataEntry implements AggregateFunction DataIndex>, Map, Map> {
>
> @Override public Map createAccumulator() {
> return new HashMap<>();
> }
>
> @Override public Map add(Tuple2 
> value, Map accumulator) {
> accumulator.merge(value.f0, value.f1, DataIndex::add);
> return accumulator;
> }
>
> @Override public Map getResult(Map DataIndex> accumulator) {
> return accumulator;
> }
>
> @Override public Map merge(Map a, 
> Map b) {
> a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
> DataIndex::add));
> return b;
> }
> }
> Could anyone know something about this NPE, thanks!
> -- 
> Best regards
>
> Sili Liu


OpenPGP_signature
Description: OpenPGP digital signature


Clarification about Flink's managed memory and metric monitoring

2021-04-13 Thread Alexis Sarda-Espinosa
Hello,

I have a Flink TM configured with taskmanager.memory.managed.size: 1372m. There 
is a streaming job using RocksDB for checkpoints, so I assume some of this 
memory will indeed be used.

I was looking at the metrics exposed through the REST interface, and I queried 
some of them:

/taskmanagers/c3c960d79c1eb2341806bfa2b2d66328/metrics?get=Status.JVM.Memory.Heap.Committed,Status.JVM.Memory.NonHeap.Committed,Status.JVM.Memory.Direct.MemoryUsed
 | jq
[
  {
"id": "Status.JVM.Memory.Heap.Committed",
"value": "1652031488"
  },
  {
"id": "Status.JVM.Memory.NonHeap.Committed",
"value": "234291200"
 223 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.MemoryUsed",
"value": "375015427"
358 MiB
  },
  {
"id": "Status.JVM.Memory.Direct.TotalCapacity",
"value": "375063552"
358 MiB
  }
]

I presume direct memory is being used by Flink and its networking stack, as 
well as by the JVM itself. To be sure:


  1.  Increasing "taskmanager.memory.framework.off-heap.size" or 
"taskmanager.memory.task.off-heap.size" should increase 
Status.JVM.Memory.Direct.TotalCapacity, right?
  2.  I presume the native memory used by RocksDB cannot be tracked with these 
JVM metrics even if "state.backend.rocksdb.memory.managed" is true, right?

Based on this question: 
https://stackoverflow.com/questions/30622818/off-heap-native-heap-direct-memory-and-native-memory,
 I imagine Flink/RocksDB either allocates memory completely independently of 
the JVM, or it uses unsafe. Since the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html#managed-memory)
 states that "Managed memory is managed by Flink and is allocated as native 
memory (off-heap)", I thought this native memory might show up as part of 
direct memory tracking, but I guess it doesn't.

Regards,
Alexis.



Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Rahul Patwari
Hi Arvid,

Thanks for the reply.

could you please help me to understand how the at least once guarantee
> would work without checkpointing in your case?
>

This was the plan to maintain "at least once" guarantee:
Logic at Sink:
The DataStream on which Sink Function is applied, on the same DataStream,
apply a widowing operator and compute min of the timestamps of records and
persist the timestamp in Cassandra -> This will persist the record
timestamp below which all the records are processed, say, every 10 seconds.
(The delay created by the Windowing Operator used here makes sure that the
timestamp is persisted in Cassandra only after it is written to Sink)
Note: There is another Windowing Operator at the Source to delay the
processing of records.

Logic at Source:
While creating the JobGraph, read the timestamp persisted in Cassandra for
each topic and configure the start position of Kafka Consumer.

The problem is that the start positions are not respected when there are
Automatic restarts during failures. Basically, we wanted to read the
timestamp from Cassandra and start consuming from the timestamp even in
case of Automatic restarts during failures.

This pipeline should process millions of records per day with low latency.
I am avoiding Checkpointing, as the records in the Window operator and
in-flight records in the Async I/O operator are persisted along with the
Kafka offsets. But the records in Window and Async I/O operators can be
obtained just by replaying the records from Kafka Source, which is the
approach I want to take.
There is Deduplication logic in the pipeline. So, I am preferring to replay
records in case of failures rather than storing the incremental snapshots.

Thanks,
Rahul

On Tue, Apr 13, 2021 at 2:53 PM Arvid Heise  wrote:

> Hi Rahul,
>
> could you please help me to understand how the at least once guarantee
> would work without checkpointing in your case?
>
> Let's say you read records A, B, C. You use a window to delay processing,
> so let's say A passes and B, C are still in the window for the trigger.
>
> Now do you want to auto commit the offset of A after it being written in
> the sink? If so, what's keeping you from attaching the offset of the Kafka
> records to A, B, C and write the offset when writing the record into the
> sink? (Probably need to wrap your sink function into a delegating sink
> function)
>
> The way, Flink does the checkpointing is that it checkpoints the offset of
> C, and the state of the window (containing B, C) to avoid data loss. Why is
> that not working for you? Which state size do you expect?
>
> Why do you need the window operator at all? Couldn't you just backpressure
> on the async I/O by delaying the processing there? Then there would be no
> need to change anything.
>
> On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan 
> wrote:
>
>> Hi Rahul,
>>
>> Right. There are no workarounds as far as I know.
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>>  wrote:
>> >
>> > Hi Roman, Arvid,
>> >
>> > So, to achieve "at least once" guarantee, currently, automatic restart
>> of Flink should be disabled?
>> > Is there any workaround to get "at least once" semantics with Flink
>> Automatic restarts in this case?
>> >
>> > Regards,
>> > Rahul
>> >
>> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Thanks for the clarification.
>> >>
>> >> > Other than managing offsets externally, Are there any other ways to
>> guarantee "at least once" processing without enabling checkpointing?
>> >>
>> >> That's currently not possible, at least with the default connector.
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>> >>  wrote:
>> >> >
>> >> > Hi Roman,
>> >> >
>> >> > Thanks for the reply.
>> >> > This is what I meant by Internal restarts - Automatic restore of
>> Flink Job from a failure. For example, pipeline restarts when Fixed delay
>> or Failure Rate restart strategies are configured.
>> >> >
>> >> > Quoting documentation in this link - Configuring Kafka Consumer
>> start position configuration
>> >> >
>> >> >> Note that these start position configuration methods do not affect
>> the start position when the job is automatically restored from a failure
>> >> >
>> >> >
>> >> >
>> >> > It seems that there will be data loss even when offsets are managed
>> externally when there are pipeline restarts due to a failure, say, an
>> exception. On the other hand, when the pipeline is stopped and
>> resubmitted(say, an upgrade), there won't be any data loss as offsets are
>> retrieved from an external store and configured while starting Kafka
>> Consumer.
>> >> >
>> >> > We do not want to enable checkpointing as the pipeline is stateless.
>> We have Deduplication logic in the pipeline and the processing is
>> idempotent.
>> >> >
>> >> > Other than managing offsets externally, Are there any other ways to
>> guarantee "at least once" processing without enabl

Re: Flink Metric isBackPressured not available

2021-04-13 Thread Claude M
Thanks for your reply.  I'm using Flink 1.12.  I'm checking in Datadog and
the metric is not available there.
It has other task/operator metrics such as numRecordsIn/numRecordsOut there
but not the isBackPressured.


On Mon, Apr 12, 2021 at 8:40 AM Roman Khachatryan  wrote:

> Hi,
>
> The metric is registered upon task deployment and reported periodically.
>
> Which Flink version are you using? The metric was added in 1.10.
> Are you checking it in the UI?
>
> Regards,
> Roman
>
> On Fri, Apr 9, 2021 at 8:50 PM Claude M  wrote:
> >
> > Hello,
> >
> > The documentation here
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
> states there is a isBackPressured metric available yet I don't see it.  Any
> ideas why?
> >
> >
> > Thanks
>


Re: Flink 1.11.4?

2021-04-13 Thread Yuval Itzchakov
Roman, is there an ETA on 1.13?

On Mon, Apr 12, 2021, 16:17 Roman Khachatryan  wrote:

> Hi Maciek,
>
> There are no specific plans for 1.11.4 yet as far as I know.
> The official policy is to support the current and previous minor
> release [1]. So 1.12 and 1.13 will be officially supported once 1.13
> is released.
> However, it's likely that 1.11.4 will still be released.
>
> [1]
> https://flink.apache.org/downloads.html#update-policy-for-old-releases
>
> Regards,
> Roman
>
>
> On Mon, Apr 12, 2021 at 10:35 AM Maciek Próchniak  wrote:
> >
> > Hello,
> >
> > I'd like to ask if there are any plans to release 1.11.4 - I understand
> > it will be last bugfix release for 1.11.x branch, as 1.13.0 is "just
> > round the corner"?
> >
> > There are a few fixes we'd like to use - e.g.
> > https://issues.apache.org/jira/browse/FLINK-9844,
> > https://issues.apache.org/jira/browse/FLINK-21164
> >
> >
> > thanks,
> >
> > maciek
> >
>


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-13 Thread Yang Wang
I think it makes sense to have such a simple fix.

Could you please create a ticket and attach a PR?

Best,
Yang

Fuyao Li  于2021年4月13日周二 下午2:24写道:

> Hello Yang,
>
>
>
> It is very kind of you to give such a detailed explanation! Thanks for
> clarification.
>
>
>
> For the small document fix I mentioned, what do you think?
>
>
>
> Best,
>
> Fuyao
>
>
>
> *From: *Yang Wang 
> *Date: *Monday, April 12, 2021 at 23:03
> *To: *Fuyao Li 
> *Cc: *user , Yan Wang 
> *Subject: *[External] : Re: Conflict in the document - About native
> Kubernetes per job mode
>
> Hi Fuyao,
>
>
>
> Currently, Flink only supports perjob mode for Yarn. The standalone job
> cluster has been replaced with standalone application mode
>
> after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration
> do not support per-job mode.
>
>
>
> In your attached video, it is a PoC implementation for presentation. We
> have introduced the Kubernetes application mode in release 1.11
>
> and agree to not have a per-job cluster support. The only reason is that
> it is not very convenient to ship the job graphs, user jars, artifacts in
>
> Kubernetes environment.
>
>
>
> [1].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode
> 
>
>
>
> Best,
>
> Yang
>
>
>
> Fuyao Li  于2021年4月13日周二 上午8:10写道:
>
> Hello Community, Yang,
>
>
>
> I noticed a conflict in the document for per-job mode support for
> Kubernetes.
>
> In the doc here [1], it mentions
>
> in a Flink Job Cluster, the available cluster manager (like YARN or
> Kubernetes) is used to spin up a cluster for each submitted job and this
> cluster is available to that job only.
>
> It implies per job mode is supported in Kubernetes.
>
>
>
> However, in the docs [2] and [3], it clearly points out per-job mode is
> not supported in Kubernetes.
>
>
>
> *This is a conflict statement and is kind of misleading. If needed, I can
> create an MR to delete the statement in [1] for Kubernetes.. It is a small
> fix. *
>
>
>
> I also noticed another thing in the video [4] at 25:08. Yang, you are
> executing a command with -e kubernetes-per-job flag. I tried and found such
> command is not supported in Flink distribution at all. I noticed the
> version you are using is 1.11-snapshot during the demo. Are you modifying
> the source code and generated an internal version of Flink….?
>
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
> 
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
> 
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
> 
>
> [4] https://www.youtube.com/watch?v=pdFPr_VOWTU&t=833s
> 
>
>
>
> Best,
>
> Fuyao
>
>


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Arvid Heise
Hi Rahul,

could you please help me to understand how the at least once guarantee
would work without checkpointing in your case?

Let's say you read records A, B, C. You use a window to delay processing,
so let's say A passes and B, C are still in the window for the trigger.

Now do you want to auto commit the offset of A after it being written in
the sink? If so, what's keeping you from attaching the offset of the Kafka
records to A, B, C and write the offset when writing the record into the
sink? (Probably need to wrap your sink function into a delegating sink
function)

The way, Flink does the checkpointing is that it checkpoints the offset of
C, and the state of the window (containing B, C) to avoid data loss. Why is
that not working for you? Which state size do you expect?

Why do you need the window operator at all? Couldn't you just backpressure
on the async I/O by delaying the processing there? Then there would be no
need to change anything.

On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan  wrote:

> Hi Rahul,
>
> Right. There are no workarounds as far as I know.
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
>  wrote:
> >
> > Hi Roman, Arvid,
> >
> > So, to achieve "at least once" guarantee, currently, automatic restart
> of Flink should be disabled?
> > Is there any workaround to get "at least once" semantics with Flink
> Automatic restarts in this case?
> >
> > Regards,
> > Rahul
> >
> > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Thanks for the clarification.
> >>
> >> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >>
> >> That's currently not possible, at least with the default connector.
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
> >>  wrote:
> >> >
> >> > Hi Roman,
> >> >
> >> > Thanks for the reply.
> >> > This is what I meant by Internal restarts - Automatic restore of
> Flink Job from a failure. For example, pipeline restarts when Fixed delay
> or Failure Rate restart strategies are configured.
> >> >
> >> > Quoting documentation in this link - Configuring Kafka Consumer start
> position configuration
> >> >
> >> >> Note that these start position configuration methods do not affect
> the start position when the job is automatically restored from a failure
> >> >
> >> >
> >> >
> >> > It seems that there will be data loss even when offsets are managed
> externally when there are pipeline restarts due to a failure, say, an
> exception. On the other hand, when the pipeline is stopped and
> resubmitted(say, an upgrade), there won't be any data loss as offsets are
> retrieved from an external store and configured while starting Kafka
> Consumer.
> >> >
> >> > We do not want to enable checkpointing as the pipeline is stateless.
> We have Deduplication logic in the pipeline and the processing is
> idempotent.
> >> >
> >> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >> >
> >> > Thanks,
> >> > Rahul
> >> >
> >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan 
> wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> Could you please explain what you mean by internal restarts?
> >> >>
> >> >> If you commit offsets or timestamps from sink after emitting records
> >> >> to the external system then there should be no data loss.
> >> >> Otherwise (if you commit offsets earlier), you have to persist
> >> >> in-flight records to avoid data loss (i.e. enable checkpointing).
> >> >>
> >> >> Regards,
> >> >> Roman
> >> >>
> >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
> >> >>  wrote:
> >> >> >
> >> >> > Hello,
> >> >> >
> >> >> > Context:
> >> >> >
> >> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
> >> >> > The pipeline has a Windowing operator(Used only for introducing a
> delay in processing records) and AsyncI/O operators (used for
> Lookup/Enrichment).
> >> >> >
> >> >> > "At least Once" Processing semantics is needed for the pipeline to
> avoid data loss.
> >> >> >
> >> >> > Checkpointing is disabled and we are dependent on the auto offset
> commit of Kafka consumer for fault tolerance currently.
> >> >> >
> >> >> > As auto offset commit indicates that "the record is successfully
> read", instead of "the record is successfully processed", there will be
> data loss if there is a restart when the offset is committed to Kafka but
> not successfully processed by the Flink Pipeline, as the record is NOT
> replayed again when the pipeline is restarted.
> >> >> >
> >> >> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >> >> >
> >> >> > Question:
> >> >> >
> >> >> > We are looking for other ways to guarantee "at least once"
> proc

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-13 Thread Roman Khachatryan
Hi Rahul,

Right. There are no workarounds as far as I know.

Regards,
Roman

On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
 wrote:
>
> Hi Roman, Arvid,
>
> So, to achieve "at least once" guarantee, currently, automatic restart of 
> Flink should be disabled?
> Is there any workaround to get "at least once" semantics with Flink Automatic 
> restarts in this case?
>
> Regards,
> Rahul
>
> On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Thanks for the clarification.
>>
>> > Other than managing offsets externally, Are there any other ways to 
>> > guarantee "at least once" processing without enabling checkpointing?
>>
>> That's currently not possible, at least with the default connector.
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>>  wrote:
>> >
>> > Hi Roman,
>> >
>> > Thanks for the reply.
>> > This is what I meant by Internal restarts - Automatic restore of Flink Job 
>> > from a failure. For example, pipeline restarts when Fixed delay or Failure 
>> > Rate restart strategies are configured.
>> >
>> > Quoting documentation in this link - Configuring Kafka Consumer start 
>> > position configuration
>> >
>> >> Note that these start position configuration methods do not affect the 
>> >> start position when the job is automatically restored from a failure
>> >
>> >
>> >
>> > It seems that there will be data loss even when offsets are managed 
>> > externally when there are pipeline restarts due to a failure, say, an 
>> > exception. On the other hand, when the pipeline is stopped and 
>> > resubmitted(say, an upgrade), there won't be any data loss as offsets are 
>> > retrieved from an external store and configured while starting Kafka 
>> > Consumer.
>> >
>> > We do not want to enable checkpointing as the pipeline is stateless. We 
>> > have Deduplication logic in the pipeline and the processing is idempotent.
>> >
>> > Other than managing offsets externally, Are there any other ways to 
>> > guarantee "at least once" processing without enabling checkpointing?
>> >
>> > Thanks,
>> > Rahul
>> >
>> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan  wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could you please explain what you mean by internal restarts?
>> >>
>> >> If you commit offsets or timestamps from sink after emitting records
>> >> to the external system then there should be no data loss.
>> >> Otherwise (if you commit offsets earlier), you have to persist
>> >> in-flight records to avoid data loss (i.e. enable checkpointing).
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>> >>  wrote:
>> >> >
>> >> > Hello,
>> >> >
>> >> > Context:
>> >> >
>> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> >> > The pipeline has a Windowing operator(Used only for introducing a delay 
>> >> > in processing records) and AsyncI/O operators (used for 
>> >> > Lookup/Enrichment).
>> >> >
>> >> > "At least Once" Processing semantics is needed for the pipeline to 
>> >> > avoid data loss.
>> >> >
>> >> > Checkpointing is disabled and we are dependent on the auto offset 
>> >> > commit of Kafka consumer for fault tolerance currently.
>> >> >
>> >> > As auto offset commit indicates that "the record is successfully read", 
>> >> > instead of "the record is successfully processed", there will be data 
>> >> > loss if there is a restart when the offset is committed to Kafka but 
>> >> > not successfully processed by the Flink Pipeline, as the record is NOT 
>> >> > replayed again when the pipeline is restarted.
>> >> >
>> >> > Checkpointing can solve this problem. But, since the pipeline is 
>> >> > stateless, we do not want to use checkpointing, which will persist all 
>> >> > the records in Windowing Operator and in-flight Async I/O calls.
>> >> >
>> >> > Question:
>> >> >
>> >> > We are looking for other ways to guarantee "at least once" processing 
>> >> > without checkpointing. One such way is to manage Kafka Offsets 
>> >> > Externally.
>> >> >
>> >> > We can maintain offsets of each partition of each topic in Cassandra(or 
>> >> > maintain timestamp, where all records with timestamps less than this 
>> >> > timestamp are successfully processed) and configure Kafka consumer 
>> >> > Start Position - setStartFromTimestamp() or 
>> >> > setStartFromSpecificOffsets()
>> >> >
>> >> > This will be helpful if the pipeline is manually restarted (say, 
>> >> > JobManager pod is restarted). But, how to avoid data loss in case of 
>> >> > internal restarts?
>> >> >
>> >> > Has anyone used this approach?
>> >> > What are other ways to guarantee "at least once" processing without 
>> >> > checkpointing for a stateless Flink pipeline?
>> >> >
>> >> > Thanks,
>> >> > Rahul


Re: Source Operators Stuck in the requestBufferBuilderBlocking

2021-04-13 Thread Arvid Heise
Hi Sihan,

we managed to reproduce it, see [1]. It will be fixed in the next 1.12 and
the upcoming 1.13 release.

[1] https://issues.apache.org/jira/browse/FLINK-21992

On Tue, Apr 6, 2021 at 8:45 PM Roman Khachatryan  wrote:

> Hi Sihan,
>
> Unfortunately, we are unable to reproduce the issue so far. Could you
> please describe in more detail the job graph, in particular what are
> the downstream operators and whether there is any chaining?
>
> Do I understand correctly, that Flink returned back to normal at
> around 8:00; worked fine for ~3 hours; got stuck again; and then it
> was restarted?
>
> I'm also wondering whether requestBufferBuilderBlocking is just a
> frequent operation popping up in thread dump. Or do you actually see
> that Legacy source threads are *stuck* there?
>
> Could you please explain how the other metrics are calculated?
> (PURCHASE KAFKA NUM-SEC, PURCHASE OUTPOOL, PLI PURCHASE JOIN INPOOL).
> Or do you have rate metrics per source?
>
> Regards,
> Roman
>
>
>
> On Wed, Mar 31, 2021 at 1:44 AM Sihan You  wrote:
> >
> > Awesome. Let me know if you need any other information. Our application
> has a heavy usage on event timer and keyed state. The load is vey heavy. If
> that matters.
> > On Mar 29, 2021, 05:50 -0700, Piotr Nowojski ,
> wrote:
> >
> > Hi Sihan,
> >
> > Thanks for the information. Previously I was not able to reproduce this
> issue, but after adding a union I think I can see it happening.
> >
> > Best,
> > Piotrek
> >
> > pt., 26 mar 2021 o 22:59 Sihan You  napisał(a):
> >>
> >> this issue not always reproducible. it happened 2~3 times in our
> development period of 3 months.
> >>
> >> On Fri, Mar 26, 2021 at 2:57 PM Sihan You  wrote:
> >>>
> >>> Hi,
> >>>
> >>> Thanks for responding. I'm working in a commercial organization so I
> cannot share the detailed stack with you. I will try to describe the issue
> as specific as I can.
> >>> 
> >>> above is a more detailed stats of our job.
> >>> 1. How long did the job run until it got stuck?
> >>> about 9 hours.
> >>> 2. How often do you checkpoint or how many checkpoints succeeded?
> >>> I don't remember the exact number of the successful checkpoints, but
> there should be around 2. then the checkpoint started to fail because of
> the timeout.
> >>> 3. What were the typical checkpoint sizes? How much in-flight data was
> checkpointed? (A screenshot of the checkpoint tab in the Flink UI would
> suffice)
> >>> the first checkpoint is 5T and the second is 578G.
> >>> 4. Was the parallelism of the whole job 5? How is the topology roughly
> looking? (e.g., Source -> Map -> Sink?)
> >>> the source is a union of two source streams. one has a parallelism of
> 5 and the other has 80.
> >>> the job graph is like this.
> >>> source 1.1 (5 parallelism).  ->
> >>>   union ->
> >>> source 1.2 (80 parallelism) ->
> >>>
>  connect -> sink
> >>> source 2.1 (5 parallelism).  ->
> >>>   union ->
> >>> source 2.2 (80 parallelism) ->
> >>> 5. Did you see any warns/errors in the logs related to checkpointing
> and I/O?
> >>> no error is thrown.
> >>> 6. What was your checkpoint storage (e.g. S3)? Is the application
> running in the same data-center (e.g. AWS)?
> >>> we are using HDFS as the state backend and the checkpoint dir.
> >>> the application is running in our own data center and in Kubernetes as
> a standalone job.
> >>>
> >>> On Fri, Mar 26, 2021 at 7:31 AM Piotr Nowojski 
> wrote:
> 
>  Hi Sihan,
> 
>  More importantly, could you create some example job that can
> reproduce that problem? It can have some fake sources and no business
> logic, but if you could provide us with something like that, it would allow
> us to analyse the problem without going back and forth with tens of
> questions.
> 
>  Best, Piotrek
> 
>  pt., 26 mar 2021 o 11:40 Arvid Heise  napisał(a):
> >
> > Hi Sihan,
> >
> > thanks for reporting. This looks like a bug to me. I have opened an
> investigation ticket with the highest priority [1].
> >
> > Could you please provide some more context, so we have a chance to
> reproduce?
> > 1. How long did the job run until it got stuck?
> > 2. How often do you checkpoint or how many checkpoints succeeded?
> > 3. What were the typical checkpoint sizes? How much in-flight data
> was checkpointed? (A screenshot of the checkpoint tab in the Flink UI would
> suffice)
> > 4. Was the parallelism of the whole job 5? How is the topology
> roughly looking? (e.g., Source -> Map -> Sink?)
> > 5. Did you see any warns/errors in the logs related to checkpointing
> and I/O?
> > 6. What was your checkpoint storage (e.g. S3)? Is the application
> running in the same data-center (e.g. AWS)?
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21992
> >
> > On Thu, Mar 25, 2021 at 3:00 AM Sihan You 
> wrote:
> >>
> >> Hi,
> >>
> >> I ke