回复:flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 Thread xieyi
再补充一个信息:
故障案例中:
flink 客户端flink_conf.ymal 中正确配置了security.kerberos.login.keytab。




在2022年02月11日 15:18,xieyi 写道:


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的1377这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(1377)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?



Re: CDC using Query

2022-02-10 Thread mohan radhakrishnan
Thanks. I looked at it. Our primary DB is Oracle and MySql. Flink CDC
Connector uses Debezium. I think. So ververica doesn't have a Flink CDC
Connector for Oracle ?

On Mon, Feb 7, 2022 at 3:03 PM Leonard Xu  wrote:

> Hello, mohan
>
> 1. Does flink have any support to track any missed source Jdbc CDC records
> ?
>
>
> Flink CDC Connector provides Exactly once semantics which means they won’t
> miss records. Tips: The Flink JDBC Connector only
> Scan the database once which can not continuously read CDC stream.
>
> 2. What is the equivalent of Kafka consumer groups ?
>
>
> Different database has different CDC mechanism, it’s serverId which used
> to mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL.
>
>
> 3. Delivering to kafka from flink is not exactly once. Is that right ?
>
>
> No, both Flink CDC Connector and Flink Kafka Connector provide exactly
> once implementation.
>
> BTW, if your destination is Elasticsearch, the quick start demo[1] may
> help you.
>
> Best,
> Leonard
>
> [1]
> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html
>
>
>
> Thanks
>
> On Friday, February 4, 2022, mohan radhakrishnan <
> radhakrishnan.mo...@gmail.com> wrote:
>
>> Hello,
>>So the jdbc source connector is  kafka and transformation
>> is done by flink (flink sql) ? But that connector can miss records. I
>> thought. Started looking at flink for this and other use cases.
>> Can I see the alternative to spring cloudstreams( kafka streams )? Since
>> I am learning flink, kafka streams' changelog topics and exactly-once
>> delivery and dlqs seemed good for our cŕitical push notifications.
>>
>> We also needed a  elastic  sink.
>>
>> Thanks
>>
>> On Friday, February 4, 2022, Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Mohan,
>>>
>>> I don't know much about Kafka Connect, so I will not talk about its
>>> features and differences to Flink. Flink on its own does not have a
>>> capability to read a CDC stream directly from a DB. However there is the
>>> flink-cdc-connectors[1] projects which embeds the standalone Debezium
>>> engine inside of Flink's source and can process DB changelog with all
>>> processing guarantees that Flink provides.
>>>
>>> As for the idea of processing further with Kafka Streams. Why not
>>> process data with Flink? What do you miss in Flink?
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>
>>> On 04/02/2022 13:55, mohan radhakrishnan wrote:
>>>
 Hi,
  When I was looking for CDC I realized Flink uses Kafka Connector
 to stream to Flink. The idea is to send it forward to Kafka and consume it
 using Kafka Streams.

 Are there source DLQs or additional mechanisms to detect failures to
 read from the DB ?

 We don't want to use Debezium and our CDC is based on queries.

 What mechanisms does Flink have that a Kafka Connect worker does not ?
 Kafka Connect workers can go down and source data can be lost.

 Does the idea  to send it forward to Kafka and consume it using Kafka
 Streams make sense ? The checkpointing feature of Flink can help ? I plan
 to use Kafka Streams for 'Exactly-once Delivery' and changelog topics.

 Could you point out relevant material to read ?

 Thanks,
 Mohan

>>>
>


flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 Thread xieyi


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的1377这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(1377)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?



flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 Thread xieyi


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的13770506这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(13770506)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?





Illegal reflective access by org.apache.flink.api.java.ClosureCleaner

2022-02-10 Thread Антон
Hello, what could be the reason for warning like this:WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/var/flink/flink-1.13.2/lib/flink-dist_2.12-1.13.2.jar) to field java.lang.String.valueWARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner?

There Is a Delay While Over Aggregation Sending Results

2022-02-10 Thread wang guanglei
Hey Flink Community,

I am using FlinkSQL Over Aggregation 

 to calculate the number of uuid per client ip during the past 1 hour.
The flink sql I am using is something like below:
SELECT
COUNT(DISTINCT consumer_consumerUuid) OVER w AS feature_value,
clientIp as  entity_id
FROM wide_table
WINDOW w AS (
PARTITION BY clientIp
ORDER BY ts
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
​From the documentation, we know that the OVER aggregates produce an aggregated 
value for every input row, which means (in my view) the calculation is 
triggered by every input event in wide_table not by watermark?
However, seeing from my logs, there is always about a 5-60 seconds' delay 
between the input row and the result calculated by window.

The data volume is small, there are only about 1k records/hour in table 
wide_table and less than 10 consumer for each clientIp.

Is it normal with this delay? Or there is something wrong with the way it is 
used ?

Thanks.


Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

2022-02-10 Thread Fuyao Li
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t 
support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest 
to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this 
checkpoint offset. The messages has been processed after the checkpoint before 
the failure will be processed twice here after the restart. Is there any chance 
of data corruption here, for example, breaking the window and sending out 
incomplete records? I am using some session windows based on DataStream event 
time timers.

Question 2:
For the 
KafkaSource,
 I noticed that we don’t have a place to configure the semantic? Maybe enabling 
the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once 
semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is 
exactly once, right? Since OSS has such limitation, is it possible to achieve 
effective EXACTLY_ONCE semantic through additional logic at Flink side since I 
can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement the 
Sink
 you mentioned.

Thank you very much for the help!
Fuyao


From: Yun Gao 
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li , user 
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or 
StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much
similar to the option b) and are supported since 1.13. It would still be 
supported in the
next several releases and  it also be able to be migrated to the option b) 
easily.

Best,
Yun


--Original Mail --
Sender:Fuyao Li 
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user 
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink 
to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink 
for this. Which option should I use?

 *   Option 
1:TwoPhaseCommitSinkFunction
 *   Option 
2:StatefulSink
 + 
TwoPhaseCommittingSink

The legacy FlinkKafkaProducer seems to be using option (a)  This will be 
removed from Flink in the future. The 
newKafkaSink
 seems to be using option (b). Based on the comment in the code, it seems 
option (a) is recommended, which one should I 

Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread Antonio Si
Thanks Bastien. I will check it out.

Antonio.

On Thu, Feb 10, 2022 at 11:59 AM bastien dine 
wrote:

> I haven't used s3 with Flink, but according to this doc :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/
> You can setup pretty easily s3 and use it with s3://path/to/your/file with
> a write sink
> The page talk about DataStream but it should work with DataSet (
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#data-sinks
> )
>
> Maybe someone else will have more information about s3 dataset sink
>
> Regards,
>
>
> Le jeu. 10 févr. 2022 à 20:52, Antonio Si  a écrit :
>
>> Thanks Bastien. Can you point to an example of using a sink as we are
>> planning to write to S3?
>>
>> Thanks again for your help.
>>
>> Antonio.
>>
>> On Thu, Feb 10, 2022 at 11:49 AM bastien dine 
>> wrote:
>>
>>> Hello Antonio,
>>>
>>> .collect() method should be use with caution as it's collecting the
>>> DataSet (multiple partitions on multiple TM) into a List single list on JM
>>> (so in memory)
>>> Unless you have a lot of RAM, you can not use it this way and you
>>> probably should not
>>> I recommend you to use a sink to print it into a formatted file instead
>>> (like CSV one) or if it's too big, into something splittable
>>>
>>> Regards,
>>> Bastien
>>>
>>> --
>>>
>>> Bastien DINE
>>> Data Architect / Software Engineer / Sysadmin
>>> bastiendine.io
>>>
>>>
>>> Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a
>>> écrit :
>>>
 Hi,

 I am using the stateful processing api to read the states from a
 savepoint file.
 It works fine when the state size is small, but when the state size is
 larger, around 11GB, I am getting an OOM. I think it happens when it is
 doing a dataSource.collect() to obtain the states. The stackTrace is copied
 at the end of the message.

 Any suggestions or hints would be very helpful.

 Thanks in advance.

 Antonio.

 java.lang.OutOfMemoryError: null
 at
 java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
 ~[?:1.8.0_282]
 at
 java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
 ~[?:1.8.0_282]
 at
 java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
 ~[?:1.8.0_282]
 at
 java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
 ~[?:1.8.0_282]
 at
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
 ~[?:1.8.0_282]
 at
 java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
 ~[?:1.8.0_282]
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
 ~[?:1.8.0_282]
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
 ~[?:1.8.0_282]
 at
 org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
 at
 org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
 at
 org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
 at
 org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
 at
 java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
 ~[?:1.8.0_282]
 at
 java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
 ~[?:1.8.0_282]
 at
 java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
 ~[?:1.8.0_282]
 at
 java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
 ~[?:1.8.0_282]
 at
 java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
 ~[?:1.8.0_282]
 at
 java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 ~[?:1.8.0_282]
 at
 java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 ~[?:1.8.0_282]
 at
 java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
 ~[?:1.8.0_282]
 at
 org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
 at
 org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
 at
 org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2]
 at
 

Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread bastien dine
I haven't used s3 with Flink, but according to this doc :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/
You can setup pretty easily s3 and use it with s3://path/to/your/file with
a write sink
The page talk about DataStream but it should work with DataSet (
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#data-sinks
)

Maybe someone else will have more information about s3 dataset sink

Regards,


Le jeu. 10 févr. 2022 à 20:52, Antonio Si  a écrit :

> Thanks Bastien. Can you point to an example of using a sink as we are
> planning to write to S3?
>
> Thanks again for your help.
>
> Antonio.
>
> On Thu, Feb 10, 2022 at 11:49 AM bastien dine 
> wrote:
>
>> Hello Antonio,
>>
>> .collect() method should be use with caution as it's collecting the
>> DataSet (multiple partitions on multiple TM) into a List single list on JM
>> (so in memory)
>> Unless you have a lot of RAM, you can not use it this way and you
>> probably should not
>> I recommend you to use a sink to print it into a formatted file instead
>> (like CSV one) or if it's too big, into something splittable
>>
>> Regards,
>> Bastien
>>
>> --
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>>
>> Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a
>> écrit :
>>
>>> Hi,
>>>
>>> I am using the stateful processing api to read the states from a
>>> savepoint file.
>>> It works fine when the state size is small, but when the state size is
>>> larger, around 11GB, I am getting an OOM. I think it happens when it is
>>> doing a dataSource.collect() to obtain the states. The stackTrace is copied
>>> at the end of the message.
>>>
>>> Any suggestions or hints would be very helpful.
>>>
>>> Thanks in advance.
>>>
>>> Antonio.
>>>
>>> java.lang.OutOfMemoryError: null
>>> at
>>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>>> ~[?:1.8.0_282]
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>>> ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
>>> 

Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread Antonio Si
Thanks Bastien. Can you point to an example of using a sink as we are
planning to write to S3?

Thanks again for your help.

Antonio.

On Thu, Feb 10, 2022 at 11:49 AM bastien dine 
wrote:

> Hello Antonio,
>
> .collect() method should be use with caution as it's collecting the
> DataSet (multiple partitions on multiple TM) into a List single list on JM
> (so in memory)
> Unless you have a lot of RAM, you can not use it this way and you probably
> should not
> I recommend you to use a sink to print it into a formatted file instead
> (like CSV one) or if it's too big, into something splittable
>
> Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a écrit :
>
>> Hi,
>>
>> I am using the stateful processing api to read the states from a
>> savepoint file.
>> It works fine when the state size is small, but when the state size is
>> larger, around 11GB, I am getting an OOM. I think it happens when it is
>> doing a dataSource.collect() to obtain the states. The stackTrace is copied
>> at the end of the message.
>>
>> Any suggestions or hints would be very helpful.
>>
>> Thanks in advance.
>>
>> Antonio.
>>
>> java.lang.OutOfMemoryError: null
>> at
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> ~[?:1.8.0_282]
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> ~[?:1.8.0_282]
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> ~[?:1.8.0_282]
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> ~[?:1.8.0_282]
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> ~[?:1.8.0_282]
>> at
>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>> ~[?:1.8.0_282]
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> 

Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread bastien dine
Hello Antonio,

.collect() method should be use with caution as it's collecting the DataSet
(multiple partitions on multiple TM) into a List single list on JM (so in
memory)
Unless you have a lot of RAM, you can not use it this way and you probably
should not
I recommend you to use a sink to print it into a formatted file instead
(like CSV one) or if it's too big, into something splittable

Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a écrit :

> Hi,
>
> I am using the stateful processing api to read the states from a savepoint
> file.
> It works fine when the state size is small, but when the state size is
> larger, around 11GB, I am getting an OOM. I think it happens when it is
> doing a dataSource.collect() to obtain the states. The stackTrace is copied
> at the end of the message.
>
> Any suggestions or hints would be very helpful.
>
> Thanks in advance.
>
> Antonio.
>
> java.lang.OutOfMemoryError: null
> at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> ~[?:1.8.0_282]
> at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> ~[?:1.8.0_282]
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[?:1.8.0_282]
> at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> ~[?:1.8.0_282]
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_282]
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> ~[?:1.8.0_282]
> at
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> ~[?:1.8.0_282]
> at
> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
> ~[?:1.8.0_282]
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> ~[?:1.8.0_282]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> ~[?:1.8.0_282]
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> ~[?:1.8.0_282]
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> ~[?:1.8.0_282]
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> ~[?:1.8.0_282]
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>

question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread Antonio Si
Hi,

I am using the stateful processing api to read the states from a savepoint
file.
It works fine when the state size is small, but when the state size is
larger, around 11GB, I am getting an OOM. I think it happens when it is
doing a dataSource.collect() to obtain the states. The stackTrace is copied
at the end of the message.

Any suggestions or hints would be very helpful.

Thanks in advance.

Antonio.

java.lang.OutOfMemoryError: null
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
~[?:1.8.0_282]
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
~[?:1.8.0_282]
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
~[?:1.8.0_282]
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_282]
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
~[?:1.8.0_282]
at
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
~[?:1.8.0_282]
at
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
~[?:1.8.0_282]
at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_282]
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
~[?:1.8.0_282]
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
~[?:1.8.0_282]
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
~[?:1.8.0_282]
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
~[?:1.8.0_282]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

回复: flink是否支持 http请求并返回json数据

2022-02-10 Thread yckkcy
你好,我想到一个异步方案,不知道是否能满足需求,可供参考:
1、接收端,我同意Caizhi Weng的说法,还是要另起一个http server服务a:把请求的参数放入消息队列中,同时服务a通知调用方请求已收到。
2、flink从消息队列中消费数据,并将参数进行解析,业务逻辑处理。
3、sink端可以考虑继承RichSinkFunction类,将解析结果再通过调用对方的http服务传回去,在invoke方法中post数据即可。

sink端可以参考这个代码
public class HttpResultSink extends RichSinkFunction {
   private CloseableHttpClient client;
   @Override
   public void open(Configuration parameters) throws Exception {
   client = HttpClients.createDefault();

   }

   @Override
   public void close() throws Exception {

   client.close();
   }

   @Override
   public void invoke(RequesResult requesResult, Context context)  {
   // 拼接请求
   HttpPost httpPost = new HttpPost("http://yourip:8088/open/api/pushInfo;);
   // 拼接请求头与请求体
   String json = JSON.toJSONString(requesResult);
   httpPost.setHeader("Accept", "application/json");
   httpPost.setHeader("Content-type", "application/json; charset=UTF-8");
   httpPost.setEntity(entity);
   // 发送请求
   CloseableHttpResponse response = client.execute(httpPost);

   }
} 

如果直接用flink接受http请求,处理请求,再将请求结果同步返回,在一个作业中完成,我觉得难度比较大,这种同步方案不太适合flink处理,更适合spring技术栈做?
| |
杨成凯
|
|
yangchengkai2...@163.com
github:forrestlmj
|
在2022年2月9日 17:30,张锴 写道:
我觉得这种方式是可行的,请问一下我应该如何去做,有没有一些资料参考一下呢

Caizhi Weng  于2022年2月9日周三 16:15写道:

Hi!

Flink 目前没有 http server source / sink。这是一个 OLAP
的需求吗?从描述的需求来看,一种更加合理的方式应该是有一个专门的 http server 接受请求,调用 Flink API 运行一个 Flink
作业(Flink SQL 可以运行 select 语句),再将结果返回给调用方。

张锴  于2022年2月9日周三 14:28写道:



业务需求:通过http请求方式将参数传给flink,将参数带入flink程序再把结果以json的形式返回。请问一下以这种方式实时计算,flink是否支持?

flink版本:1.12.1




Problem with kafka with key=None using pyhton-kafka module

2022-02-10 Thread mrAlexTFB
Hello,

I am following the example in Python Walkthrough
,
I downloaded the zip file with the project skeleton. I'm having a problem
when changing the key attribute in the function producer.send to none.
From:

def produce():
if len(sys.argv) == 2:
delay_seconds = int(sys.argv[1])
else:
delay_seconds = 1
producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
for request in random_requests():
key = request.name.encode('utf-8')
val = request.SerializeToString()
producer.send(topic='names', key=key, value=val)
producer.flush()
time.sleep(delay_seconds)

To:

def produce():
if len(sys.argv) == 2:
delay_seconds = int(sys.argv[1])
else:
delay_seconds = 1
producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
for request in random_requests():
key = request.name.encode('utf-8')
val = request.SerializeToString()
producer.send(topic='names', key=None, value=val)
producer.flush()
time.sleep(delay_seconds)

After doing this the consumer is not displaying anything.

I modified python code so the message arrived is printed and it is not
being printed here, I suppose that the problem could be a bad configuration
in module.yaml file?

I understand that by putting key=None the topic partition will be chosen
randomly, that was the behaviour that I was aiming for as I do not need any
 ordering in the messages.

Do I need any additional configuration in this walkthrough to achieve this?

Thank you very much in advance.


Re: Issue with Flink UI for Flink 1.14.0

2022-02-10 Thread Guillaume Vauvert

Hi,

This issue is impacting all deployments with 2 JobManagers or more (HA 
mode), because in this case serialization is used (well, depending on 
the JobManager who is responding, the Leader or a Follower).


It prevents:

* usage of Flink UI

* usage of Flink command "flink.sh list"

* usage of Flink REST API "/jobs/overview"

There are some workaround for all impacts, but that is additional work, 
so impact is important.


Should it be possible to release sooner than "planned" ?

Thanks !

--

Guillaume

On 10/02/2022 11.35, Roman Khachatryan wrote:

Hi,

AFAIK there are no plans currently to release 1.14.4.
The previous one (1.14.3) was released on Jan 20, so I'd 
1.14.4 preparation to start in the next several weeks.


Regards,
Roman


On Tue, Feb 8, 2022 at 7:31 PM Sweta Kalakuntla 
 wrote:


I am facing the same issue, do we know when 1.14.4 will be released?

Thanks.

On Fri, Jan 21, 2022 at 3:28 AM Chesnay Schepler
 wrote:

While FLINK-24550 was indeed fixed unfortunately a similar bug
was also introduced
(https://issues.apache.org/jira/browse/FLINK-25732).

On 20/01/2022 21:18, Peter Westermann wrote:


Just tried this again with Flink 1.14.3 since
https://issues.apache.org/jira/browse/FLINK-24550 is listed
as fixed. I am running into similar errors when calling the
/v1/jobs/overview endpoint (without any running jobs):

{"errors":["Internal server error.",""]}

Peter Westermann

Team Lead – Realtime Analytics

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com


cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 

*From: *Dawid Wysakowicz 

*Date: *Thursday, October 14, 2021 at 10:00 AM
*To: *Peter Westermann 
, user@flink.apache.org
 
*Subject: *Re: Issue with Flink UI for Flink 1.14.0

I am afraid it is a bug in flink 1.14. I created a ticket for
it FLINK-24550[1]. I believe we should pick it up soonish.
Thanks for reporting the issue!

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24550

On 13/10/2021 20:32, Peter Westermann wrote:

Hello,

I just started testing Flink 1.14.0 and noticed some
weird behavior. This is for a Flink cluster with
zookeeper for HA and two job managers (one leader, one
backup). The UI on the leader works fine. The UI on the
other job manager does not load any job-specific data.
Same applies to the REST interface. If I requests job
data from /v1/jobs/{jobId}, I get the expected response
on the leader but on the other job manager, I only get an
exception stack trace:

{"errors":["Internal server error.",""]}

Peter Westermann

Team Lead – Realtime Analytics

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com


cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 



Re: Issue with Flink UI for Flink 1.14.0

2022-02-10 Thread Roman Khachatryan
Hi,

AFAIK there are no plans currently to release 1.14.4.
The previous one (1.14.3) was released on Jan 20, so I'd 1.14.4 preparation
to start in the next several weeks.

Regards,
Roman


On Tue, Feb 8, 2022 at 7:31 PM Sweta Kalakuntla 
wrote:

> I am facing the same issue, do we know when 1.14.4 will be released?
>
> Thanks.
>
> On Fri, Jan 21, 2022 at 3:28 AM Chesnay Schepler 
> wrote:
>
>> While FLINK-24550 was indeed fixed unfortunately a similar bug was also
>> introduced (https://issues.apache.org/jira/browse/FLINK-25732).
>> On 20/01/2022 21:18, Peter Westermann wrote:
>>
>> Just tried this again with Flink 1.14.3 since
>> https://issues.apache.org/jira/browse/FLINK-24550 is listed as fixed. I
>> am running into similar errors when calling the /v1/jobs/overview endpoint
>> (without any running jobs):
>>
>> {"errors":["Internal server error.","> side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
>> Failed to serialize the result for RPC call :
>> requestMultipleJobDetails.\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat
>> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat
>> java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:946)\n\tat
>> java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2266)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat
>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
>> akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat
>> akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat
>> akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat
>> akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat
>> akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat
>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)\n\tat
>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)\n\tat
>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)\n\tat
>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)\n\tat
>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)\nCaused
>> by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat
>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)\n\tat
>> java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)\n\tat
>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)\n\tat
>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)\n\tat
>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)\n\tat
>> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)\n\tat
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t...
>> 30 more\n\nEnd of exception on server side>"]}
>>
>>
>>
>>
>>
>>
>>
>> Peter Westermann
>>
>> Team Lead – Realtime Analytics
>>
>> [image: cidimage001.jpg@01D78D4C.C00AC080]
>>
>> peter.westerm...@genesys.com
>>
>> [image: cidimage001.jpg@01D78D4C.C00AC080]
>>
>> [image: cidimage002.jpg@01D78D4C.C00AC080] 
>>
>>
>>
>>
>>
>> *From: *Dawid Wysakowicz 
>> 
>> *Date: *Thursday, October 14, 2021 at 10:00 AM
>> *To: *Peter Westermann 
>> , user@flink.apache.org
>>  
>> *Subject: *Re: 

Re: JSONKeyValueDeserializationSchema cannot be converted to ObjectNode>

2022-02-10 Thread Martijn Visser
Thanks for sharing the full solution, much appreciated!

On Thu, 10 Feb 2022 at 09:07, HG  wrote:

> The complete solution for the record ( that others can benefit from it).
>
> KafkaSource source = KafkaSource.builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type",trustStoreType)
> .setProperty("ssl.truststore.password",trustStorePassword)
> .setProperty("ssl.truststore.location",trustStoreLocation)
> .setProperty("security.protocol",securityProtocol)
> .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
> .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
> .setGroupId(groupId)
> .setTopics(kafkaInputTopic)
> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(fetchMetadata)))
> 
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build();
>
>
> Op wo 9 feb. 2022 om 09:46 schreef HG :
>
>> Sorry to have bothered everyone.
>>
>> This is the obvious solution:
>>
>> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
>> JSONKeyValueDeserializationSchema(false)))
>>
>>
>> Regards Hans-Peter
>>
>>
>> Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan :
>>
>>> Hi,
>>>
>>> setDeserializer() expects KafkaRecordDeserializationSchema;
>>> JSONKeyValueDeserializationSchema you provided is not compatible with
>>> it.
>>> You can convert it using [1]
>>>
>>> [1]
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-
>>>
>>>
>>> Regards,
>>> Roman
>>>
>>> On Tue, Feb 8, 2022 at 5:43 PM HG  wrote:
>>> >
>>> > Hi all,
>>> >
>>> > When I build this code:
>>> >
>>> > KafkaSource source = KafkaSource.builder()
>>> > .setProperties(kafkaProps)
>>> > .setProperty("ssl.truststore.type",trustStoreType)
>>> > .setProperty("ssl.truststore.password",trustStorePassword)
>>> > .setProperty("ssl.truststore.location",trustStoreLocation)
>>> > .setProperty("security.protocol",securityProtocol)
>>> > .setProperty("partition.discovery.interval.ms",
>>> partitionDiscoveryIntervalMs)
>>> > .setProperty("commit.offsets.on.checkpoint",
>>> commitOffsetsOnCheckpoint)
>>> > .setGroupId(groupId)
>>> > .setTopics(kafkaInputTopic)
>>> > .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>>> >
>>>  
>>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>> > .build();
>>> >
>>> >
>>> > I get:
>>> > This error:
>>> >
>>> > error: incompatible types:
>>> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
>>> cannot be converted to
>>> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
>>> > .setDeserializer(new
>>> JSONKeyValueDeserializationSchema(false))
>>> >
>>> >
>>> > What am I doing wrong?
>>> > As per the documentation JSONKeyValueDeserializationSchema returns an
>>> ObjectNode.
>>> >
>>> > Regards Hans
>>> >
>>>
>>


Re: JSONKeyValueDeserializationSchema cannot be converted to ObjectNode>

2022-02-10 Thread HG
The complete solution for the record ( that others can benefit from it).

KafkaSource source = KafkaSource.builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();


Op wo 9 feb. 2022 om 09:46 schreef HG :

> Sorry to have bothered everyone.
>
> This is the obvious solution:
>
> .setDeserializer(KafkaRecordDeserializationSchema.of(new 
> JSONKeyValueDeserializationSchema(false)))
>
>
> Regards Hans-Peter
>
>
> Op di 8 feb. 2022 om 21:56 schreef Roman Khachatryan :
>
>> Hi,
>>
>> setDeserializer() expects KafkaRecordDeserializationSchema;
>> JSONKeyValueDeserializationSchema you provided is not compatible with
>> it.
>> You can convert it using [1]
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html#of-org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema-
>>
>>
>> Regards,
>> Roman
>>
>> On Tue, Feb 8, 2022 at 5:43 PM HG  wrote:
>> >
>> > Hi all,
>> >
>> > When I build this code:
>> >
>> > KafkaSource source = KafkaSource.builder()
>> > .setProperties(kafkaProps)
>> > .setProperty("ssl.truststore.type",trustStoreType)
>> > .setProperty("ssl.truststore.password",trustStorePassword)
>> > .setProperty("ssl.truststore.location",trustStoreLocation)
>> > .setProperty("security.protocol",securityProtocol)
>> > .setProperty("partition.discovery.interval.ms",
>> partitionDiscoveryIntervalMs)
>> > .setProperty("commit.offsets.on.checkpoint",
>> commitOffsetsOnCheckpoint)
>> > .setGroupId(groupId)
>> > .setTopics(kafkaInputTopic)
>> > .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>> >
>>  
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> > .build();
>> >
>> >
>> > I get:
>> > This error:
>> >
>> > error: incompatible types:
>> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
>> cannot be converted to
>> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
>> > .setDeserializer(new
>> JSONKeyValueDeserializationSchema(false))
>> >
>> >
>> > What am I doing wrong?
>> > As per the documentation JSONKeyValueDeserializationSchema returns an
>> ObjectNode.
>> >
>> > Regards Hans
>> >
>>
>