Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-28 Thread Guowei Ma
Congratulations!

Best,
Guowei


On Tue, Mar 28, 2023 at 12:02 PM Yuxin Tan  wrote:

> Congratulations!
>
> Best,
> Yuxin
>
>
> Guanghui Zhang  于2023年3月28日周二 11:06写道:
>
>> Congratulations!
>>
>> Best,
>> Zhang Guanghui
>>
>> Hang Ruan  于2023年3月28日周二 10:29写道:
>>
>> > Congratulations!
>> >
>> > Best,
>> > Hang
>> >
>> > yu zelin  于2023年3月28日周二 10:27写道:
>> >
>> >> Congratulations!
>> >>
>> >> Best,
>> >> Yu Zelin
>> >>
>> >> 2023年3月27日 17:23,Yu Li  写道:
>> >>
>> >> Dear Flinkers,
>> >>
>> >>
>> >>
>> >> As you may have noticed, we are pleased to announce that Flink Table
>> Store has joined the Apache Incubator as a separate project called Apache
>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a
>> streaming data lake platform for high-speed data ingestion, change data
>> tracking and efficient real-time analytics, with the vision of supporting a
>> larger ecosystem and establishing a vibrant and neutral open source
>> community.
>> >>
>> >>
>> >>
>> >> We would like to thank everyone for their great support and efforts
>> for the Flink Table Store project, and warmly welcome everyone to join the
>> development and activities of the new project. Apache Flink will continue
>> to be one of the first-class citizens supported by Paimon, and we believe
>> that the Flink and Paimon communities will maintain close cooperation.
>> >>
>> >>
>> >> 亲爱的Flinkers,
>> >>
>> >>
>> >> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
>> >> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
>> >> Apache
>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>> >>
>> >>
>> >> 在这里我们要感谢大家对 Flink Table Store
>> >> 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon
>> 支持的主力计算引擎之一,我们也相信
>> >> Flink 和 Paimon 社区将继续保持密切合作。
>> >>
>> >>
>> >> Best Regards,
>> >> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>> >>
>> >> 致礼,
>> >> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>> >>
>> >> [1] https://paimon.apache.org/
>> >> [2] https://github.com/apache/incubator-paimon
>> >> [3]
>> https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>> >>
>> >>
>> >>
>>
>


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-28 Thread Guowei Ma
Congratulations!

Best,
Guowei


On Tue, Mar 28, 2023 at 12:02 PM Yuxin Tan  wrote:

> Congratulations!
>
> Best,
> Yuxin
>
>
> Guanghui Zhang  于2023年3月28日周二 11:06写道:
>
>> Congratulations!
>>
>> Best,
>> Zhang Guanghui
>>
>> Hang Ruan  于2023年3月28日周二 10:29写道:
>>
>> > Congratulations!
>> >
>> > Best,
>> > Hang
>> >
>> > yu zelin  于2023年3月28日周二 10:27写道:
>> >
>> >> Congratulations!
>> >>
>> >> Best,
>> >> Yu Zelin
>> >>
>> >> 2023年3月27日 17:23,Yu Li  写道:
>> >>
>> >> Dear Flinkers,
>> >>
>> >>
>> >>
>> >> As you may have noticed, we are pleased to announce that Flink Table
>> Store has joined the Apache Incubator as a separate project called Apache
>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a
>> streaming data lake platform for high-speed data ingestion, change data
>> tracking and efficient real-time analytics, with the vision of supporting a
>> larger ecosystem and establishing a vibrant and neutral open source
>> community.
>> >>
>> >>
>> >>
>> >> We would like to thank everyone for their great support and efforts
>> for the Flink Table Store project, and warmly welcome everyone to join the
>> development and activities of the new project. Apache Flink will continue
>> to be one of the first-class citizens supported by Paimon, and we believe
>> that the Flink and Paimon communities will maintain close cooperation.
>> >>
>> >>
>> >> 亲爱的Flinkers,
>> >>
>> >>
>> >> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
>> >> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
>> >> Apache
>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>> >>
>> >>
>> >> 在这里我们要感谢大家对 Flink Table Store
>> >> 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon
>> 支持的主力计算引擎之一,我们也相信
>> >> Flink 和 Paimon 社区将继续保持密切合作。
>> >>
>> >>
>> >> Best Regards,
>> >> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>> >>
>> >> 致礼,
>> >> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>> >>
>> >> [1] https://paimon.apache.org/
>> >> [2] https://github.com/apache/incubator-paimon
>> >> [3]
>> https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>> >>
>> >>
>> >>
>>
>


Re: Broadcast State + Stateful Operator + Async IO

2022-04-29 Thread Guowei Ma
Hi Vishal

If your scenario is to update the data in full every time. One idea is to
rerun the job every time. For example, you have an
`EnrichWithDatabaseAndWebSerivce` job, which is responsible for reading all
data from a data source every time, and then joins the data with DB and Web
services. Every time you need to re-enrich you have to start the job again.

Also, can you briefly describe what the frequency is?

Best,
Guowei


On Fri, Apr 29, 2022 at 2:20 PM Vishal Surana  wrote:

> Yes. You have explained my requirements exactly as they are. My operator
> will talk to multiple databases and a couple of web services to enrich
> incoming input streams. I cannot think of a way to use the async IO
> operator. So I thought maybe convert these 7-10 calls into async calls and
> chain the Futures together. I believe I have to block once in the end of
> the KeyedBroadcastProcessFunction but if there's a way to avoid that also
> while also ensuring ordered processing of events, then do let me know.
>
> On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma  wrote:
>
>> Hi Vishal
>>
>> I want to understand your needs first. Your requirements are: After a
>> stateful operator receives a notification, it needs to traverse all the
>> data stored in the operator state, communicate with an external system
>> during the traversal process (maybe similar to join?). In order to improve
>> the efficiency of  this behavior, you want to take an asynchronous
>> approach. That is, if you modify the state of different keys, do not block
>> each other due to external communication.
>> If I understand correctly, according to the existing function of
>> KeyedBroadcastProcessFunction, it is really impossible.
>> As for whether there are other solutions, it may depend on specific
>> scenarios, such as what kind of external system. So could you describe in
>> detail what scenario has this requirement, and what are the external
>> systems it depends on?
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana 
>> wrote:
>>
>>> Hello,
>>> My application has a stateful operator which leverages RocksDB to store
>>> a large amount of state. It, along with other operators receive
>>> configuration as a broadcast stream (KeyedBroadcastProcessFunction). The
>>> operator depends upon another input stream that triggers some communication
>>> with external services whose results are then combined to yield the state
>>> that gets stored in RocksDB.
>>>
>>> In order to make the application more efficient, I am going to switch to
>>> asynchronous IO but as the result is ultimately going to be a (Scala)
>>> Future, I will have to block once to get the result. I was hoping to
>>> leverage the Async IO operator but that apparently doesn't support RocksDB
>>> based state storage. Am I correct in saying
>>> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
>>> want to understand how registering a future's callbacks (via onComplete)
>>> works with a synchronous operator such as KeyedBroadcastProcessFunction.
>>> Will the thread executing the function simply relinquish control to some
>>> other subtask while the results of the external services are being awaited?
>>> Will the callback eventually be triggered automatically or will I have to
>>> explicitly block on the result future like so: Await.result(f, timeout).
>>>
>>> --
>>> Regards,
>>> Vishal
>>>
>>
>
> --
> Regards,
> Vishal
>


Re: Broadcast State + Stateful Operator + Async IO

2022-04-28 Thread Guowei Ma
Hi Vishal

I want to understand your needs first. Your requirements are: After a
stateful operator receives a notification, it needs to traverse all the
data stored in the operator state, communicate with an external system
during the traversal process (maybe similar to join?). In order to improve
the efficiency of  this behavior, you want to take an asynchronous
approach. That is, if you modify the state of different keys, do not block
each other due to external communication.
If I understand correctly, according to the existing function of
KeyedBroadcastProcessFunction, it is really impossible.
As for whether there are other solutions, it may depend on specific
scenarios, such as what kind of external system. So could you describe in
detail what scenario has this requirement, and what are the external
systems it depends on?

Best,
Guowei


On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana  wrote:

> Hello,
> My application has a stateful operator which leverages RocksDB to store a
> large amount of state. It, along with other operators receive configuration
> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
> upon another input stream that triggers some communication with external
> services whose results are then combined to yield the state that gets
> stored in RocksDB.
>
> In order to make the application more efficient, I am going to switch to
> asynchronous IO but as the result is ultimately going to be a (Scala)
> Future, I will have to block once to get the result. I was hoping to
> leverage the Async IO operator but that apparently doesn't support RocksDB
> based state storage. Am I correct in saying
> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
> want to understand how registering a future's callbacks (via onComplete)
> works with a synchronous operator such as KeyedBroadcastProcessFunction.
> Will the thread executing the function simply relinquish control to some
> other subtask while the results of the external services are being awaited?
> Will the callback eventually be triggered automatically or will I have to
> explicitly block on the result future like so: Await.result(f, timeout).
>
> --
> Regards,
> Vishal
>


Re: Checkpoint Timeout Troubleshooting

2022-04-28 Thread Guowei Ma
Hi Sam

I think the first step is to see which part of your Flink APP is blocking
the completion of Checkpoint. Specifically, you can refer to the
"Checkpoint Details" section of the document [1]. Using these methods, you
should be able to observe where the checkpoint is blocked, for example, it
may be an agg operator of the app, or it may be blocked on the sink of
kafka.
Once you know which operator is blocking, you can use FlameGraph [2] to see
where the bottleneck of the operator is. Then do specific operations.

Hope these help!
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/#checkpoint-details
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/

Best,
Guowei


On Fri, Apr 29, 2022 at 2:10 AM Sam Ch  wrote:

> Hello,
>
> I am running into checkpoint timeouts and am looking for guidance on
> troubleshooting. What should I be looking at? What configuration parameters
> would affect this? I am afraid I am a Flink newbie so I am still picking up
> the concepts. Additional notes are below, anything else I can provide?
> Thanks.
>
>
> The checkpoint size is small (less than 100kB)
> Multiple flink apps are running on a cluster, only one is running into
> checkpoint timeouts
> Timeout is set to 10 mins
> Tried aligned and unaligned checkpoints
> Tried clearing checkpoints to start fresh
> Plenty of disk space
> Dataflow: kafka source -> flink app -> kafka sink
>


Re: Write to Aliyun OSS via FileSystem connector hang Job Master on Finishing

2022-04-25 Thread Guowei Ma
Hi

Afaik the commit files action happens at the committer operator instead of
the JM size after the new sink api [1].

It means this would not happen if you use the new `FlinkSink`.[2]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java

Best,
Guowei


On Sun, Apr 24, 2022 at 11:39 AM Yi Tang  wrote:

>
>
> -- Forwarded message -
> From: Yi Tang 
> Date: Sun, Apr 24, 2022 at 11:29 AM
> Subject: Write to Aliyun OSS via FileSystem connector hang Job Master on
> Finishing
> To: 
>
>
> Hi team;
>
> I'm trying to write to aliyun oss via FileSystem connector. The job master
> always hangs on finishing.
>
> Looks like it is because the FileSystem connector commits the files by
> #finalizeGlobal while the Job is finishing, which includes some rename
> operations. However, the aliyun oss FileSystem renames files by copying,
> which seems expensive.
>
> Any suggestions about this scenario?
>
> Thanks and regards.
>
>


Re: Handling non-transient exceptions

2022-04-19 Thread Guowei Ma
Hi Jose

In some scenarios I think it would make sense to optionally allow the job
going on even if there are some exceptions.

But IMHO the scenario might be more likely to debug something but the
production. And in my own limited experience most of user actually could do
this themself i.e. Dealing with the exception when using `MapFunction`,
which only the developer could give the default "value(s)" to the  system.


So I would like to leave it outside the Flink.  But please correct me if  I
miss something.

Best,
Guowei


On Mon, Apr 18, 2022 at 5:54 PM Jose Brandao  wrote:

> Hello,
>
>
>
> Thank you for your answer. Yes, we are using the DataStream API.
>
>
> I agree that exceptions are developer’s responsibility but errors can
> still happen and I would like to have a progressive approach in case they
> happen instead of a blocking one.
>
>
>
> I will take a look at your suggestion. Wouldn’t it make sense to
> optionally allowing to move into the next message in case of an unpredicted
> exception happens instead of only killing the tasks and wait for a restart?
> I know that in some cases those exceptions might cause irreparable damage
> to applications but it could be configured per exception.
>
>
>
>
>
> Regards,
>
> José Brandão
>
>
>
> *From: *Guowei Ma 
> *Date: *Friday, 15 April 2022 at 11:04
> *To: *Jose Brandao 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Handling non-transient exceptions
>
>
>
>
>
> *EXTERNAL SENDER: This email originated from outside our email system. If
> you find this email suspicious please use the Report Phishing button in
> your Outlook to flag this to the Security Operations team.  *
>
>
>
>
>
>
>
> Hi, Jose
>
>
>
> I assume you are using the DataStream API.
>
>
>
> In general for any udf's exception in the DataStream job, only the
> developer of the DataStream job knows whether the exception can be
> tolerated. Because in some cases, tolerating exceptions can cause errors in
> the final result. So you still have to handle each udf exception yourself.
>
>
>
> However, there are indeed some points that can be optimized:
>
> 1. If you do have a lot of DataStream jobs, you can use some Java Lambda
> tricks to simplify these things, which may make the whole process  easier.
> For example, you can define a
> `sideOutputTheElementCausedTheException(processFunctionX, ...other
> parameters) ` in this function, once ProcessFunctionX throws any exception
> you output the exception Element to a SideOutput.
>
> 2. As for the differences in the types you mentioned, I tend to normalize
> them all into a json or use  avro format.
>
>
>
> But I think it is not easy work to replay all the exception elements.  It
> is only easy to do the replay with the source element.
>
>
> Best,
>
> Guowei
>
>
>
>
>
> On Fri, Apr 15, 2022 at 12:33 AM Jose Brandao 
> wrote:
>
> Hello,
>
> Searching some expertise on exception handling with checkpointing and
> streaming.  Let’s say some bad data flows into your Flink application and
> causes an exception you are not expecting. That exception will bubble up,
> ending up in killing the respective task and the app will not be able to
> progress. Eventually the topology will restart (if configured so) from the
> previous successful checkpoint/savepoint and will hit that broken message
> again, resulting in a loop.
>
>
>
> If we don’t know how to process a given message we would like our topology
> to progress and sink that message into some sort of dead-letter kafka
> topic.
>
>
>
> We have seen some recommendation on using Side Outputs
> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/side_output/>
>  for
> that but it looks like things can easily get messy with that. We would need
> to extend all our operators with try-catch blocks and side output messages
> within the catch. Then we would need to aggregate all those side outputs
> and decide what to do with them. If we want to output exactly the inbound
> message that originated the exception it requires some extra logic as well
> since our operators have different output types. On top of that it looks
> like the type of operators which allow side outputs is limited.
> https://stackoverflow.com/questions/52411705/flink-whats-the-best-way-to-handle-exceptions-inside-flink-jobs
>
>
>
> Wondering if there is a better way to do it? We would like to avoid our
> topology to be *stuck* because one message originates some unpredicted
> exception and we would also like to have as well the possibility to
> *replay* it once we put a fix in place, hence the dead letter topic idea.
>
>
>
> Regards,
>
> José Brandão
>
>
>
>
>
>
>
>


Re: Handling non-transient exceptions

2022-04-15 Thread Guowei Ma
Hi, Jose

I assume you are using the DataStream API.

In general for any udf's exception in the DataStream job, only the
developer of the DataStream job knows whether the exception can be
tolerated. Because in some cases, tolerating exceptions can cause errors in
the final result. So you still have to handle each udf exception yourself.

However, there are indeed some points that can be optimized:

1. If you do have a lot of DataStream jobs, you can use some Java Lambda
tricks to simplify these things, which may make the whole process  easier.
For example, you can define a
`sideOutputTheElementCausedTheException(processFunctionX, ...other
parameters) ` in this function, once ProcessFunctionX throws any exception
you output the exception Element to a SideOutput.

2. As for the differences in the types you mentioned, I tend to normalize
them all into a json or use  avro format.

But I think it is not easy work to replay all the exception elements.  It
is only easy to do the replay with the source element.

Best,
Guowei


On Fri, Apr 15, 2022 at 12:33 AM Jose Brandao  wrote:

> Hello,
>
> Searching some expertise on exception handling with checkpointing and
> streaming.  Let’s say some bad data flows into your Flink application and
> causes an exception you are not expecting. That exception will bubble up,
> ending up in killing the respective task and the app will not be able to
> progress. Eventually the topology will restart (if configured so) from the
> previous successful checkpoint/savepoint and will hit that broken message
> again, resulting in a loop.
>
>
>
> If we don’t know how to process a given message we would like our topology
> to progress and sink that message into some sort of dead-letter kafka
> topic.
>
>
>
> We have seen some recommendation on using Side Outputs
> 
>  for
> that but it looks like things can easily get messy with that. We would need
> to extend all our operators with try-catch blocks and side output messages
> within the catch. Then we would need to aggregate all those side outputs
> and decide what to do with them. If we want to output exactly the inbound
> message that originated the exception it requires some extra logic as well
> since our operators have different output types. On top of that it looks
> like the type of operators which allow side outputs is limited.
> https://stackoverflow.com/questions/52411705/flink-whats-the-best-way-to-handle-exceptions-inside-flink-jobs
>
>
>
> Wondering if there is a better way to do it? We would like to avoid our
> topology to be *stuck* because one message originates some unpredicted
> exception and we would also like to have as well the possibility to
> *replay* it once we put a fix in place, hence the dead letter topic idea.
>
>
>
> Regards,
>
> José Brandão
>
>
>
>
>
>
>


Re: Issue with doing filesink to HDFS

2022-04-13 Thread Guowei Ma
Hi,Anubhav

Would you like to share the result of `echo $HADOOP_CLASSPATH`  and the
detailed information after you set up the hadoop classpaht?

Best,
Guowei


On Wed, Apr 13, 2022 at 4:16 PM Anubhav Nanda 
wrote:

> Hi Guomei,
>
> That i already did but still getting the issue
>
> Regards,
> Anubhav
>
>
>
> [image: Mailtrack]
> <https://mailtrack.io?utm_source=gmail_medium=signature_campaign=signaturevirality11;>
>  Sender
> notified by
> Mailtrack
> <https://mailtrack.io?utm_source=gmail_medium=signature_campaign=signaturevirality11;>
>  13/04/22,
> 13:46:13
>
> On Wed, Apr 13, 2022 at 1:23 PM Guowei Ma  wrote:
>
>> Hi
>> I think you need to export HADOOP_CLASSPATH correclty. [1]
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/yarn/#preparation
>> Best,
>> Guowei
>>
>>
>> On Wed, Apr 13, 2022 at 12:50 PM Anubhav Nanda 
>> wrote:
>>
>>> Hi,
>>>
>>> I have setup flink 1.13.5 and we are using Hadoop 3.0.0 while we are
>>> running simple wordcount example we are getting following error
>>>
>>>
>>> ./flink-1.13.5/bin/flink run flink-1.13.5/examples/batch/WordCount.jar
>>> --input hdfs:///tmp/log4j.properties
>>>
>>>
>>> Caused by: org.apache.flink.runtime.JobException: Creating the input
>>> splits caused an error: Could not find a file system implementation for
>>> scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop
>>> file system to support this scheme could be loaded. For a full list of
>>> supported file systems, please see
>>> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
>>>
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:247)
>>>
>>> at
>>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:792)
>>>
>>> at
>>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:196)
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107)
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
>>>
>>> at
>>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>>>
>>> at
>>> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>>>
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>>>
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
>>>
>>> at
>>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>>>
>>> at
>>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>>>
>>> at
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>>>
>>> ... 8 more
>>>
>>> Caused by:
>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
>>> find a file system implementation for scheme 'hdfs'. The scheme is not
>>> directly supported by Flink and no Hadoop file system to support this
>>> scheme could be loaded. For a full list of supported file systems, please
>>> see
>>> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
>>>
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:530)
>>>
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407)
>>>
>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
>>>
>>> 

Re: Issue with doing filesink to HDFS

2022-04-13 Thread Guowei Ma
Hi
I think you need to export HADOOP_CLASSPATH correclty. [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/yarn/#preparation
Best,
Guowei


On Wed, Apr 13, 2022 at 12:50 PM Anubhav Nanda 
wrote:

> Hi,
>
> I have setup flink 1.13.5 and we are using Hadoop 3.0.0 while we are
> running simple wordcount example we are getting following error
>
>
> ./flink-1.13.5/bin/flink run flink-1.13.5/examples/batch/WordCount.jar
> --input hdfs:///tmp/log4j.properties
>
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Could not find a file system implementation for
> scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop
> file system to support this scheme could be loaded. For a full list of
> supported file systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:247)
>
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:792)
>
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:196)
>
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
>
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>
> ... 8 more
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded. For a full list of supported file systems,
> please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
>
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:530)
>
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407)
>
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
>
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:599)
>
> at
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:63)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:234)
>
> ... 21 more
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
> the classpath, or some classes are missing from the classpath.
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:189)
>
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526)
>
> ... 26 more
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.hadoop.hdfs.HdfsConfiguration
>
> at
> org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(HadoopUtils.java:59)
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:84)
>
> ... 27 more
>
>
>
>
>
>
> [image: Mailtrack]
> 
>  Sender
> notified by
> Mailtrack
> 
>  13/04/22,
> 10:20:02
>


Re: Why first op after union cannot be chained?

2022-04-06 Thread Guowei Ma
Hi Zhanghao

AFAIK, you might to see the `StreamingJobGraphGenerator` not the
`JobGraphGenerator` which is only used by the old flink stream sql stack.
>From comment of the `StreamingJobGraphGenerator::isChainableInput` the `an
union operator` does not support chain currently.

Best,
Guowei


On Wed, Apr 6, 2022 at 12:11 AM Zhanghao Chen 
wrote:

> Dear all,
>
> I was recently investigating why the chaining behavior of a Flink SQL job
> containing union ops is a bit surprising. The SQL, simplified to the
> extreme, is as below:
>
> *CREATE  TABLE datagen_source (word VARCHAR)*
> *WITH ('connector' = 'datagen', 'rows-per-second' = '5');*
>
> *CREATE  TABLE blackhole_sink (word VARCHAR)*
> *WITH ('connector' = 'blackhole');*
>
> *INSERT INTO blackhole_sink*
> *SELECT  word*
> *FROM(*
> *SELECT  word*
> *FROMdatagen_source*
> *WHERE   word = '1'*
> *UNION ALL*
> *SELECT  word*
> *FROMdatagen_source*
> *WHERE   word = '1'*
> *)*
>
> With all the operators having the same parallelism, I thought all the ops
> should be chained, but it turns out that the sink is not chained. I found
> the following comment in the code piece for checking the eligibility of
> chaining in JobGraphGenerator::createSingleInputVertex:
> "*first op after union is stand-alone, because union is merged*" that
> could be relevant, but I'm not sure what it means.
>
> Could anyone enlighten me how to understand this?
>
> Best,
> Zhanghao Chen
>


Re: Is it possible to make SideOutput back to input stream?

2022-03-21 Thread Guowei Ma
Hi, Huang
>From the document[1] it seems that you need to close the iterate stream.
such as `iteration.closeWith(feedback);`
BTW You also could get a detailed iteration example from here [2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#iterate
[2]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java

Best,
Guowei


On Mon, Mar 21, 2022 at 2:27 PM Chen-Che Huang  wrote:

> Hi all,
>
> We have an application where the operations on some keys depend on the
> results of related keys. Assume that there are
> two keys k1 and k2 that have some relationship between them. Our
> application won't send the value for key k1 to the data sink
> when the value for key k2 was sent to the data sink earlier. To do so, we
> hope that our Flink application can send some value
> information for key k2 to SideOutput and the SideOutput becomes the input
> of the original stream (see below).
>
> dataSource1
> .union(dataSource2)
> .iterate(
> inStream => {
> val outStream = inStream
> .keyBy(_.key)
> .connect(relationshipSource)
> .process(new CustomOperator())
>
> (outStream.getSideOutput(CustomOperator.Result), outStream)
> }
> )
> .disableChaining()
> .name(OperatorKey.Name).uid(OperatorKey.Name)
>
> However, although our Flink application can write value info to SideOutput
> successfully, the data in SideOutput won't be
> sent to the input stream. We wonder whether it's doable for our scenario
> with Flink? If so, how should we modify our code to
> achieve the goal? Many thanks for any comments.
>
> Best regards,
> Chen-Che Huang
>


Re: exception when parallelizing application

2022-03-21 Thread Guowei Ma
It seems that the key's hashcode is not stable.
So would you like to show the details of the `TraceKeyOuterClass.TraceKey`.

Best,
Guowei


On Sun, Mar 20, 2022 at 3:21 PM Prashant Deva  wrote:

> here is the key code (in kotlin)
>
>  val ks =  object: KeySelector TraceFragmentOuterClass.TraceFragment>, TraceKeyOuterClass.TraceKey> {
> override fun getKey(it:Tuple2 TraceFragmentOuterClass.TraceFragment>): TraceKeyOuterClass.TraceKey {
> return it.f0
> }
> }
>
> and here is the code that uses it:
>
> env.addSource(kafkaConsumer, name_source)
>
> .name(name_source).uid(name_source).setMaxParallelism(Config.MAX_PARALLELISM)
> .keyBy (ks)
>
> .window(EventTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.seconds(60)))
> .process(MyProcessor())
>
> .name(name_processor).uid(name_processor).setMaxParallelism(Config.MAX_PARALLELISM)
> .addSink(kafkaProducer)
> .uid(name_sink).name(name_sink)
>
>
> i am using protobufserializer from chill-protobuf library for serde. its
> configured as follows:
>
>
> env.config.registerTypeWithKryoSerializer(TraceFragmentOuterClass.TraceFragment::class.java,
> ProtobufSerializer::class.java)
>
> env.config.registerTypeWithKryoSerializer(TraceKeyOuterClass.TraceKey::class.java,
> ProtobufSerializer::class.java)
>
> env.config.registerTypeWithKryoSerializer(FullTraceOuterClass.FullTrace::class.java,
> ProtobufSerializer::class.java)
>
> env.config.registerTypeWithKryoSerializer(SpanOuterClass.Span::class.java,
> ProtobufSerializer::class.java)
>
>
> On Sun, Mar 20, 2022 at 12:15 AM caoyu  wrote:
>
>> Would you like copy the key code here to help debugging.
>>
>>  Replied Message 
>> From Prashant Deva 
>> Date 03/20/2022 12:24
>> To user 
>> Subject exception when parallelizing application
>> using flink 1.13.2. When i increase the parallelization of my application
>> from 1 to 2, i see the following exceptions. what do they mean? how can i
>> possibly fix this?
>>
>> java.lang.IllegalArgumentException: key group from 128 to 256 does not 
>> contain 89
>>  at 
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>  at 
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>  at 
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>  at 
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>  at 
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>  at 
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>  at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922)
>>  at 
>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:44)
>>  at 
>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
>>  at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:936)
>>  at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>>  at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>  at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
>>  at java.base/java.lang.Thread.run(Thread.java:829)
>>
>>


Re: Controlling group partitioning with DataStream

2022-03-08 Thread Guowei Ma
Hi, Ken

If you are talking about the Batch scene, there may be another idea that
the engine automatically and evenly distributes the amount of data to be
processed by each Stage to each worker node. This also means that, in some
cases, the user does not need to manually define a Partitioner.

At present, Flink has a FLIP-187 [1], which is working in this direction,
but to achieve the above goals may also require the follow up work of
FLIP-186 [2]. After the release of 1.15, we will carry out the
"Auto-rebalancing of workloads" related work as soon as possible, you can
pay attention to the progress of this FLIP.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler#FLIP187:AdaptiveBatchJobScheduler-Futureimprovements

Best,
Guowei


On Wed, Mar 9, 2022 at 8:44 AM Ken Krugler 
wrote:

> Hi Dario,
>
> Just to close the loop on this, I answered my own question on SO.
>
> Unfortunately it seems like the recommended solution is to do the same
> hack I did a while ago, which is to generate (via trial-and-error) a key
> that gets assigned to the target slot.
>
> I was hoping for something a bit more elegant :)
>
> I think it’s likely I could make it work by implementing my own version
> of KeyGroupStreamPartitioner, but as I’d noted in my SO question, that
> would involve use of some internal-only classes, so maybe not a win.
>
> — Ken
>
>
> On Mar 4, 2022, at 3:14 PM, Dario Heinisch 
> wrote:
>
> Hi,
>
> I think you are looking for this answer from David:
> https://stackoverflow.com/questions/69799181/flink-streaming-do-the-events-get-distributed-to-each-task-slots-separately-acc
>
> I think then you could technically create your partitioner - though little
> bit cubersome - by mapping your existing keys to new keys who will have
> then an output to the desired
> group & slot.
>
> Hope this may help,
>
> Dario
> On 04.03.22 23:54, Ken Krugler wrote:
>
> Hi all,
>
> I need to be able to control which slot a keyBy group goes to, in order to
> compensate for a badly skewed dataset.
>
> Any recommended approach to use here?
>
> Previously (with a DataSet) I used groupBy followed by a withPartitioner,
> and provided my own custom partitioner.
>
> I posted this same question to
> https://stackoverflow.com/questions/71357833/equivalent-of-dataset-groupby-withpartitioner-for-datastream
>
> Thanks,
>
> — Ken
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Re: CSV join in batch mode

2022-02-23 Thread Guowei Ma
Hi, Killian
Sorry for responding late!
I think there is no simple way that could catch csv processing errors. That
means that you need to do it yourself.(Correct me if I am missing
something).
I think you could use RockDB State Backend[1], which would spill data to
disk.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/#rocksdb-state-backend-details

Best,
Guowei


On Mon, Feb 21, 2022 at 6:33 PM Killian GUIHEUX <
killian.guiheu...@thalesdigital.io> wrote:

> Hello all,
>
> I have to perform a join between two large csv sets that do not fit in
> ram. I process this two files in batch mode. I also need a side output to
> catch csv processing errors.
> So my question is what is the best way to this kind of join operation ? I
> think I should use a valueState state backend but would it work if my ram
> is my states goes larger than my RAM ?
>
> Regards.
>
> Killian
>
> This message contains confidential information and is intended only for
> the individual(s) addressed in the message. If you are not the named
> addressee, you should not disseminate, distribute, or copy this e-mail. If
> you are not the intended recipient, you are notified that disclosing,
> distributing, or copying this e-mail is strictly prohibited.
>


Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Guowei Ma
Thanks Ananth for your clarification.But I am not an expert on Pulsar.
I would cc the author of the connector to have a look. Would Yufei like to
give some insight?

Best,
Guowei


On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula <
agundabatt...@darwinium.com> wrote:

> Thanks for the response Guowei.
>
>
>
>- Tried a telnet to the jobmanager host:port and I get “*127.0.0.1:8086
><http://127.0.0.1:8086>: nodename nor servname provided, or not known*”
>which suggests that the network access is fine ?
>- I resubmitted the word count example and it ran fine to completion.
>
>
>
> For the pulsar script, I have also tried localhost, and the local LAN Ips
> as jobmanager host configuration in conf/flink.yaml and all of them end
> with the same result. I have also tried this with Pulsar 2.8.0 and it did
> have issues with “shared” subscription type (Get a “transactions not
> enabled” error in spite of enabling transactions in 2.8.0 broker).  When I
> change the subscription type to “Exclusive” it exhibits the same behavior
> as the Pulsar 2.9.1 version. i.e. The job manager submission fails. (in
> both 2.8.0 pulsar and 2.9.1 pulsar)
>
>
>
> Regards,
>
> Ananth
>
>
>
> *From: *Guowei Ma 
> *Date: *Monday, 21 February 2022 at 4:57 pm
> *To: *Ananth Gundabattula 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in
> standalone.
>
> Hi, Ansanth
>
>
>
> I don't see any error logs on the server side, so it's hard to tell what
> the specific problem is. From the current log, there are two things to try
> first:
>
>
> 1. From the client's log, it is a 5-minute timeout, so you can telnet
> 127.0.0.1:8086 to see if there is a problem with the local network
> 2. From the log on the server side, there is no job submission at all. You
> can try to submit the wordcount example again when submitting the pulsar
> example fails. So as to rule out whether the session cluster is inherently
> problematic.
>
>
> Best,
>
> Guowei
>
>
>
>
>
> On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula <
> agundabatt...@darwinium.com> wrote:
>
> Hello All,
>
>
>
> I have a Pyflink script that needs to read from Pulsar and process the
> data.
>
>
>
> I have done the following to implement a prototype.
>
>
>
>1. Since I need Pyflink way to connect to Pulsar , I checked out the
>code from master branch as advised in a different thread. (PyFlink Pulsar
>connector seems to be slated for 1.15 release)
>2. I built the Flink source.
>3. I am using the following location as FLINK_HOME under the source:
>flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
>4. The python pyflink wheels have been appropriately installed in the
>right python conda environment.
>5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the
>$FLINK_HOME/lib folder.
>6. I started the standalone cluster by running bin/start-cluster.sh
>7. I submit my test script by using bin/flink run –python …
>8. If am launching the the word_count example in flink documentation,
>everything runs fine and it completes successfully.
>9. However, if the script involves the Pulsar connector, the logs show
>that the Flink client codebase is not able to submit the job to the
>Jobamanger.
>10. It ultimately dies with a Channel Idle exception. (See this in
>DEBUG mode of the logs). I am attaching the logs for reference.
>
>
>
> I am trying this on OSx. Please note that the classic word_count script
> works fine without any issues and I see the job submission failures on the
> client only when the pulsar source connector is in the script. I have also
> added the logs for the standalone session job manager.I am also attaching
> the script for reference.
>
>
>
> Could you please advise what can I do to resolve the issue. (Will raise an
> JIRA-Issue if someone thinks it is a bug).
>
>
>
> Regards,
>
> Ananth
>
>
>
>


Re: Apache Flink - Continuously streaming data using jdbc connector

2022-02-20 Thread Guowei Ma
Hi,

You can try flink's cdc connector [1] to see if it meets your needs.

[1] https://github.com/ververica/flink-cdc-connectors

Best,
Guowei


On Mon, Feb 21, 2022 at 6:23 AM M Singh  wrote:

> Hi Folks:
>
> I am trying to monitor a jdbc source and continuously streaming data in an
> application using the jdbc connector.  However, the application stops after
> reading the data in the table.
>
> I've checked the docs (
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/)
> and it looks like there is a streaming sink but the sources are scan and
> lookup only.  I've also checked the connector settings but could not find
> any flag for continuous monitoring.
>
> Can you please let me know if there is setting in the connector or advice
> to make the jdbc connector source streaming data continuously ?
>
> Thanks for your help.
>
> Mans
>


Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-20 Thread Guowei Ma
Hi, Ansanth

I don't see any error logs on the server side, so it's hard to tell what
the specific problem is. From the current log, there are two things to try
first:

1. From the client's log, it is a 5-minute timeout, so you can telnet
127.0.0.1:8086 to see if there is a problem with the local network
2. From the log on the server side, there is no job submission at all. You
can try to submit the wordcount example again when submitting the pulsar
example fails. So as to rule out whether the session cluster is inherently
problematic.

Best,
Guowei


On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula <
agundabatt...@darwinium.com> wrote:

> Hello All,
>
>
>
> I have a Pyflink script that needs to read from Pulsar and process the
> data.
>
>
>
> I have done the following to implement a prototype.
>
>
>
>1. Since I need Pyflink way to connect to Pulsar , I checked out the
>code from master branch as advised in a different thread. (PyFlink Pulsar
>connector seems to be slated for 1.15 release)
>2. I built the Flink source.
>3. I am using the following location as FLINK_HOME under the source:
>flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
>4. The python pyflink wheels have been appropriately installed in the
>right python conda environment.
>5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the
>$FLINK_HOME/lib folder.
>6. I started the standalone cluster by running bin/start-cluster.sh
>7. I submit my test script by using bin/flink run –python …
>8. If am launching the the word_count example in flink documentation,
>everything runs fine and it completes successfully.
>9. However, if the script involves the Pulsar connector, the logs show
>that the Flink client codebase is not able to submit the job to the
>Jobamanger.
>10. It ultimately dies with a Channel Idle exception. (See this in
>DEBUG mode of the logs). I am attaching the logs for reference.
>
>
>
> I am trying this on OSx. Please note that the classic word_count script
> works fine without any issues and I see the job submission failures on the
> client only when the pulsar source connector is in the script. I have also
> added the logs for the standalone session job manager.I am also attaching
> the script for reference.
>
>
>
> Could you please advise what can I do to resolve the issue. (Will raise an
> JIRA-Issue if someone thinks it is a bug).
>
>
>
> Regards,
>
> Ananth
>
>
>


Re: How to get FileName when using FileSink in Flink

2022-01-30 Thread Guowei Ma
Hi,Kartik

FileSink does not expose the file name to the user now. 
Would you like to share your scenario ,which needs the file name?

Best,
Guowei


发自我的iPhone

> 在 2022年1月30日,下午6:38,Kartik Khare  写道:
> 
> Hi,
> For my use case, I want to get the part file name that is being created in 
> the HDFS when using the file sink. I saw the code and can’t find a way to 
> extend one of the existing Bucket classes to achieve this. The thing I am 
> looking for is `assemblePartFilePath` functions output.
> 
> Regards,
> Kartik


Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Guowei Ma
Hi, Shawn

Thank you for your sharing. Unfortunately I do not think there is an easy
way to achieve this now.
Actually we have a customer who has the same requirement but the scenario
is a little different. The bounded and unbounded pipeline have some
differences but the customer wants reuse some state of the bounded pipeline.
Another question is what the api does the pipelined use? DataStream or SQL

Best,
Guowei


On Wed, Jan 26, 2022 at 8:58 PM Shawn Du  wrote:

>right!
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 19:50
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi,Shawn
>
> You want to use the correct state(n-1) for day n-1 and the full amount of
> data for day n to produce the correct state(n) for day n.
>  Then use state(n) to initialize a job to process the data for day n+1.
>  Am I understanding this correctly?
>
> Best,
> Guowei
>
> Shawn Du 于2022年1月26日 周三下午7:15写道:
> Hi Gaowei,
>
> think the case:
> we have one streaming application built by flink, but kinds of
> reason, the event may be disordered or delayed terribly.
> we want to replay the data day by day(the data was processed like
> reordered.). it looks like a batching job but with state. we want to use
> the same code for replaying.
> thus we need persist the state for next job. any ideas?
>
> Thanks
> Shawn
>
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 15:39
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi Shawn
> Currently Flink can not trigger the sp at the end of the input. An
> alternative way might be that you need to develop a customized source,
> which triggers a savepoint when it notices that all the input split has
> been handled.
> Or you could see the state process api[1], which might be helpful.
>
> Thanks for your sharing but I have another little question:
> I think you need to process all the historical events to rebuild the
> correct state. So there might be no gain even if you periodically create a
> savepoint. So why did you need to "rebuild" the state periodically? Do I
> miss something?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/
>
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:
>
>our application is stateful. processing live events depends on the
> state. but for kinds of reason, we need rebuild the state. it will be very
> costly to replay all data.
>our historical events data are stored in s3. so we want to create
> states/savepoints periodically so that we can rebuild the state from a
> point. we call this as a bootstrap process.
>any ideas?
>
>Thanks.
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 14:04
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi, Shawn
> I think Flink does not support this mechanism yet.
> Would you like to share the scenario in which you need this savepoint at
> the end of the bounded input?
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>
>
> --
> Best,
> Guowei
>
>
>


Re: create savepoint on bounded source in streaming mode

2022-01-26 Thread Guowei Ma
Hi,Shawn

You want to use the correct state(n-1) for day n-1 and the full amount of
data for day n to produce the correct state(n) for day n.
 Then use state(n) to initialize a job to process the data for day n+1.
 Am I understanding this correctly?

Best,
Guowei

Shawn Du 于2022年1月26日 周三下午7:15写道:

> Hi Gaowei,
>
> think the case:
> we have one streaming application built by flink, but kinds of
> reason, the event may be disordered or delayed terribly.
> we want to replay the data day by day(the data was processed like
> reordered.). it looks like a batching job but with state. we want to use
> the same code for replaying.
> thus we need persist the state for next job. any ideas?
>
> Thanks
> Shawn
>
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 15:39
>
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi Shawn
> Currently Flink can not trigger the sp at the end of the input. An
> alternative way might be that you need to develop a customized source,
> which triggers a savepoint when it notices that all the input split has
> been handled.
> Or you could see the state process api[1], which might be helpful.
>
> Thanks for your sharing but I have another little question:
> I think you need to process all the historical events to rebuild the
> correct state. So there might be no gain even if you periodically create a
> savepoint. So why did you need to "rebuild" the state periodically? Do I
> miss something?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/
>
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:
>
>our application is stateful. processing live events depends on the
> state. but for kinds of reason, we need rebuild the state. it will be very
> costly to replay all data.
>our historical events data are stored in s3. so we want to create
> states/savepoints periodically so that we can rebuild the state from a
> point. we call this as a bootstrap process.
>any ideas?
>
>Thanks.
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 14:04
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi, Shawn
> I think Flink does not support this mechanism yet.
> Would you like to share the scenario in which you need this savepoint at
> the end of the bounded input?
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>
>
> --
Best,
Guowei


Re: create savepoint on bounded source in streaming mode

2022-01-25 Thread Guowei Ma
Hi Shawn
Currently Flink can not trigger the sp at the end of the input. An
alternative way might be that you need to develop a customized source,
which triggers a savepoint when it notices that all the input split has
been handled.
Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the
correct state. So there might be no gain even if you periodically create a
savepoint. So why did you need to "rebuild" the state periodically? Do I
miss something?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

Best,
Guowei


On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:

>
>our application is stateful. processing live events depends on the
> state. but for kinds of reason, we need rebuild the state. it will be very
> costly to replay all data.
>our historical events data are stored in s3. so we want to create
> states/savepoints periodically so that we can rebuild the state from a
> point. we call this as a bootstrap process.
>any ideas?
>
>Thanks.
>
> --
> Sender:Guowei Ma 
> Sent At:2022 Jan. 26 (Wed.) 14:04
> Recipient:Shawn Du 
> Cc:user 
> Subject:Re: create savepoint on bounded source in streaming mode
>
> Hi, Shawn
> I think Flink does not support this mechanism yet.
> Would you like to share the scenario in which you need this savepoint at
> the end of the bounded input?
> Best,
> Guowei
>
>
> On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:
> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>
>
>


Re: create savepoint on bounded source in streaming mode

2022-01-25 Thread Guowei Ma
Hi, Shawn
I think Flink does not support this mechanism yet.
Would you like to share the scenario in which you need this savepoint at
the end of the bounded input?
Best,
Guowei


On Wed, Jan 26, 2022 at 1:50 PM Shawn Du  wrote:

> Hi experts,
>
> assume I have several files and I want replay these files in order in
> streaming mode and create a savepoint when files play at the end. it is
> possible?
> I wrote a simple test app, and job are finished when source is at the end.
> I have no chance to creat a savepoint. please help.
>
> Thanks
> Shawn
>


Re: Tuning akka.ask.timeout

2022-01-24 Thread Guowei Ma
Hi
After 1.14.0 I think Flink should work well even at the 1000*1000 scale +
10s akka.timeout in the deploy stage.
So thank you for any further feedback after you investigate.

BTW: I think you might look at
https://issues.apache.org/jira/browse/FLINK-24295, which might cause the
problem.

Best,
Guowei



On Mon, Jan 24, 2022 at 4:31 PM Paul Lam  wrote:

> Hi Guowei,
>
> Thanks a lot for your reply.
>
> I’m using 1.14.0. The timeout happens at job deployment time. A subtask
> would run for a short period of `akka.ask.timeout` before fails due to the
> timeout.
>
> I noticed that jobmanager have a very hight CPU usage at the moment, like
> 2000%. I’m reasoning about the cause by profiling.
>
> Best,
> Paul Lam
>
> 2022年1月21日 09:56,Guowei Ma  写道:
>
> Hi, Paul
>
> Would you like to share some information such as the Flink version you
> used and the memory of TM and JM.
> And when does the timeout happen? Such as at begin of the job or during
> the running of the job
>
> Best,
> Guowei
>
>
> On Thu, Jan 20, 2022 at 4:45 PM Paul Lam  wrote:
>
>> Hi,
>>
>> I’m tuning a Flink job with 1000+ parallelism, which frequently fails
>> with Akka TimeOutException (it was fine with 200 parallelism).
>>
>> I see some posts recommend increasing `akka.ask.timeout` to 120s. I’m not
>> familiar with Akka but it looks like a very long time compared to the
>> default 10s and as a response timeout.
>>
>> So I’m wondering what’s the reasonable range for this option? And why
>> would the Actor fail to respond in time (the message was dropped due to
>> pressure)?
>>
>> Any input would be appreciated! Thanks a lot.
>>
>> Best,
>> Paul Lam
>>
>>
>


Re: FileSource Usage

2022-01-20 Thread Guowei Ma
Hi, Meghajit

Thanks Meghajit for sharing your user case.
I found a workaround way that you could try to name your file in a
timestamp style. More details could be found here[1].
Another little concern is that Flink is a distributed system, which means
that we could not assume any order even if we list the file in the created
order.

[1]
https://stackoverflow.com/questions/49045725/gsutil-gcloud-storage-file-listing-sorted-date-descending
Best,
Guowei


On Thu, Jan 20, 2022 at 11:11 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hi Guowei,
>
> Thanks for your answer. Regarding your question,
> *> Currently there is no such public interface ,which you could extend to
> implement your own strategy. Would you like to share the specific problem
> you currently meet?*
>
> The GCS bucket that we are trying to read from is periodically populated
> with parquet files by another service. This can be daily or even hourly.
> For an already pre-populated bucket, we would like to read the files
> created from, say, day *T* till day *T+10*.  Order matters here and hence
> we would like to read the oldest files first, and then the new ones.  Would
> you know how I can enforce a reading order here ?
>
> Thanks,
> Meghajit
>
>
>
>
> On Thu, Jan 20, 2022 at 2:29 PM Guowei Ma  wrote:
>
>> Hi, Meghajit
>>
>> 1. From the implementation [1] the order of split depends on the
>> implementation of the FileSystem.
>>
>> 2. From the implementation [2] the order of the file also depends on the
>> implementation of the FileSystem.
>>
>> 3. Currently there is no such public interface ,which you could extend to
>> implement your own strategy. Would you like to share the specific problem
>> you currently meet?
>>
>> 3. `FileSource` supports checkpoints. I think the watermark is a general
>> mechanism so you could read the related documentation[3].
>>
>> [1]
>> https://github.com/apache/flink/blob/355b165859aebaae29b6425023d352246caa0613/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java#L141
>>
>> [2]
>> https://github.com/apache/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java#L102
>>
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/
>> Best,
>> Guowei
>>
>>
>> On Wed, Jan 19, 2022 at 6:06 PM Meghajit Mazumdar <
>> meghajit.mazum...@gojek.com> wrote:
>>
>>> Hello,
>>>
>>> We are using FileSource
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/>
>>> to process Parquet Files and had a few doubts around it. Would really
>>> appreciate if somebody can help answer them:
>>>
>>> 1. For a given file, does FileSource read the contents inside it in
>>> order ? In other words, what is the order in which the file splits are
>>> generated from the contents of the file ?
>>>
>>> 2. We want to provide a GCS Bucket URL to the FileSource so that it can
>>> read parquet files from there. The bucket has multiple parquet files.
>>> Wanted to know, what is the order in which the files will be picked and
>>> processed by this FileSource ? Can we provide an order strategy ourselves,
>>> say, process according to creation time ?
>>>
>>> 3. Is it possible/good practice to apply checkpointing and watermarking
>>> for a bounded source like FileSource ?
>>>
>>> --
>>> *Regards,*
>>> *Meghajit*
>>>
>>
>
> --
> *Regards,*
> *Meghajit*
>


Re: Tuning akka.ask.timeout

2022-01-20 Thread Guowei Ma
Hi, Paul

Would you like to share some information such as the Flink version you used
and the memory of TM and JM.
And when does the timeout happen? Such as at begin of the job or during the
running of the job

Best,
Guowei


On Thu, Jan 20, 2022 at 4:45 PM Paul Lam  wrote:

> Hi,
>
> I’m tuning a Flink job with 1000+ parallelism, which frequently fails with
> Akka TimeOutException (it was fine with 200 parallelism).
>
> I see some posts recommend increasing `akka.ask.timeout` to 120s. I’m not
> familiar with Akka but it looks like a very long time compared to the
> default 10s and as a response timeout.
>
> So I’m wondering what’s the reasonable range for this option? And why
> would the Actor fail to respond in time (the message was dropped due to
> pressure)?
>
> Any input would be appreciated! Thanks a lot.
>
> Best,
> Paul Lam
>
>


Re: FileSource Usage

2022-01-20 Thread Guowei Ma
Hi, Meghajit

1. From the implementation [1] the order of split depends on the
implementation of the FileSystem.

2. From the implementation [2] the order of the file also depends on the
implementation of the FileSystem.

3. Currently there is no such public interface ,which you could extend to
implement your own strategy. Would you like to share the specific problem
you currently meet?

3. `FileSource` supports checkpoints. I think the watermark is a general
mechanism so you could read the related documentation[3].

[1]
https://github.com/apache/flink/blob/355b165859aebaae29b6425023d352246caa0613/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java#L141

[2]
https://github.com/apache/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java#L102

[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/
Best,
Guowei


On Wed, Jan 19, 2022 at 6:06 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> We are using FileSource
>  to
> process Parquet Files and had a few doubts around it. Would really
> appreciate if somebody can help answer them:
>
> 1. For a given file, does FileSource read the contents inside it in order
> ? In other words, what is the order in which the file splits are generated
> from the contents of the file ?
>
> 2. We want to provide a GCS Bucket URL to the FileSource so that it can
> read parquet files from there. The bucket has multiple parquet files.
> Wanted to know, what is the order in which the files will be picked and
> processed by this FileSource ? Can we provide an order strategy ourselves,
> say, process according to creation time ?
>
> 3. Is it possible/good practice to apply checkpointing and watermarking
> for a bounded source like FileSource ?
>
> --
> *Regards,*
> *Meghajit*
>


Re: Prom Pushgateway Reporter HTTPS support

2022-01-18 Thread Guowei Ma
Hi,Mason
I assign the jira to you.
Thanks for your contribution.
Best,
Guowei


On Wed, Jan 19, 2022 at 2:07 PM Mason Chen  wrote:

> Hi all,
>
> There is some interest from our users to use prometheus push gateway
> reporter with a https endpoint. So, I've filed
> https://issues.apache.org/jira/browse/FLINK-25697 and I figured that it
> would be acceptable since influxdb reporter supports something similar.
> Could someone assign me this ticket--I’d like to help contribute this back
> to OSS!
>
> Best,
> Mason


Re: How does Flink add third-party jars without the service going offline

2022-01-18 Thread Guowei Ma
Hi, summer
>>>Now I need to use a third-party jar in the Flink service, should I put
it under ${FLINK_HOME}/lib?
I think maybe an alternative way is to put the third-party jar into a fat
jar.

>>>How to enable Flink to automatically load third-party jars?

In general this is the JVM mechanism. It means that the jar would be loaded
if they are in the classpath.
Best,
Guowei


On Wed, Jan 19, 2022 at 11:09 AM summer  wrote:

> Hello
>
> Now I need to use a third-party jar in the Flink service, should I put
> it under ${FLINK_HOME}/lib? How to enable Flink to automatically load
> third-party jars?
>
>
> TH
>


Re: mutual authentication with ssl

2021-11-25 Thread Guowei Ma
Hi Rahul

>From your description I guess maybe you could try different flink.yaml(one
for server and another for client).
I am not an expert about SSL and security stuff.  So please correct me if I
am wrong.

Best,
Guowei


On Wed, Nov 24, 2021 at 3:54 AM Rahul  wrote:

> Hello,
> I am trying to set up ssl mutual authentication for the external rest
> endpoints, following this doc
> .
> But it looks like currently flink expects the same exact path for
> keystore/truststore to be present on both client and server sides.
> Unfortunately with the current production setup we have, we cannot have the
> key/trustore under the same path on both  client and the server.
> Can anyone recommend a way around this?
> Ideally it would have been nice to have something like security.ssl.rest.
> *server*.keystore and security.ssl.rest.*client*.keystore, but any
> pointers here is appreciated.
>
> Thanks,
> Rahul
>


Re: Wrapping a GenericRecord in a Tuple

2021-11-25 Thread Guowei Ma
Hi Joseph
Would you like to give more details about the error message?
Best,
Guowei


On Thu, Nov 25, 2021 at 2:59 AM Joseph Lorenzini 
wrote:

> Hi all,
>
>
>
> I have an implementation of KafkaDeserializationSchema interface that
> deserializes a kafka consumer record into a generic record.
>
>
>
> Per the docs, you need to specify the produced type to be
> GenericRecordAvroTypeInfo.
>
>
>
>
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>
>
>
> In the getProducedType method, I return an instance of
> GenericRecordAvroTypeInfo with an appropriate schema.  This works as
> expected.
>
>
>
> However, when I wrap this in a tuple, I get a deserialization error. Is
> this user error on my part or is this not support?
>
>
>
> public TypeInformation>
> getProducedType() {
>
> GenericRecordAvroTypeInfo deserializerType = new
> GenericRecordAvroTypeInfo(deserializerSchema);
>
> GenericRecordAvroTypeInfo errType = new
> GenericRecordAvroTypeInfo(GenericRecordDeserializerErrorBuilder.getSchema());
>
> return new TupleTypeInfo<>(deserializerType, errType);
>
> }
>
>
>
>
>
> Thanks,
>
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: What is Could not retrieve file from transient blob store?

2021-11-04 Thread Guowei Ma
leHandler.java:135)
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve file
> from transient blob store.
> ... 10 more
> Caused by: java.io.FileNotFoundException: Local file
> /tmp/blobStore-9cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b
> does not exist and failed to copy from blob store.
> at
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:516)
> at
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:369)
> at
> org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.lambda$respondToRequest$0(AbstractTaskManagerFileHandler.java:133)
> ... 9 more
> 2021-11-02 23:20:22,703 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
>  - Unhandled exception.
> org.apache.flink.util.FlinkException: Could not retrieve file from
> transient blob store.
> at
> org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.lambda$respondToRequest$0(AbstractTaskManagerFileHandler.java:135)
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: Local file
> /tmp/blobStore-9cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b
> does not exist and failed to copy from blob store.
> at
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:516)
> at
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:369)
> at
> org.apache.flink.runtime.rest.handler.taskmanager.AbstractTaskManagerFileHandler.lambda$respondToRequest$0(AbstractTaskManagerFileHandler.java:133)
> ... 9 more
> 2021-11-02 23:47:57,865 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [xxjob-0001/xx.72:37007]
> failed with java.io.IOException: Connection reset by peer
> 2021-11-02 23:47:57,912 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system
> [akka.tcp://flink@xxjob-0001:37007] has failed, address is now gated
> for [50] ms. Reason: [Disassociated]
> 2021-11-02 23:53:41,565 WARN  akka.remote.transport.netty.NettyTransport
>  - Remote connection to [xxjob-0001/xx.72:42961]
> failed with java.io.IOException: Connection reset by peer
> 2021-11-02 23:53:41,571 WARN  akka.remote.ReliableDeliverySupervisor
>  - Association with remote system
> [akka.tcp://flink-metrics@xxjob-0001:42961] has failed, address is
> now gated for [50] ms. Reason: [Disassociated]
>
> On Thu, 4 Nov 2021 at 03:45, Guowei Ma  wrote:
>
>> >>>Ok I missed the log below. I guess when the task manager was stopped
>> this

Re: What is Could not retrieve file from transient blob store?

2021-11-04 Thread Guowei Ma
>>>Ok I missed the log below. I guess when the task manager was stopped
this happened.
I think if the TM stopped you also would not get the log. But It will throw
another "UnknownTaskExecutorException", which would include something like
“No TaskExecutor registered under ”.

>>> But I guess it's ok and not a big issue???
Does this happen continuously?

Best,
Guowei


On Thu, Nov 4, 2021 at 12:39 AM John Smith  wrote:

> Ok I missed the log below. I guess when the task manager was stopped this
> happened.
>
> I attached the full sequence. But I guess it's ok and not a big issue???
>
>
> 2021-11-02 23:20:22,682 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerLogFileHandler - Failed to transfer file from TaskExecutor 7e1
> b7db5918004e4160fdecec1bbdad7.
> java.util.concurrent.CompletionException: org.apache.flink.util.
> FlinkException: Could not retrieve file from transient blob store.
> at org.apache.flink.runtime.rest.handler.taskmanager.
> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
> AbstractTaskManagerFileHandler.java:135)
> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> .java:670)
> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> CompletableFuture.java:646)
> at java.util.concurrent.CompletableFuture$Completion.run(
> CompletableFuture.java:456)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
> .run(NioEventLoop.java:515)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at org.apache.flink.shaded.netty4.io.netty.util.internal.
> ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve file
> from transient blob store.
> ... 10 more
> Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
> cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
> c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b does
> not exist and failed to copy from blob store.
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:516)
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:444)
> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:
> 369)
> at org.apache.flink.runtime.rest.handler.taskmanager.
> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
> AbstractTaskManagerFileHandler.java:133)
> ... 9 more
> 2021-11-02 23:20:22,703 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerLogFileHandler - Unhandled exception.
> org.apache.flink.util.FlinkException: Could not retrieve file from
> transient blob store.
> at org.apache.flink.runtime.rest.handler.taskmanager.
> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
> AbstractTaskManagerFileHandler.java:135)
> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> .java:670)
> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> CompletableFuture.java:646)
> at java.util.concurrent.CompletableFuture$Completion.run(
> CompletableFuture.java:456)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
> .run(NioEventLoop.java:515)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at org.apache.flink.shaded.netty4.io.netty.util.internal.
> ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
> cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
> c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b does
> not exist and failed to copy from blob store.
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:516)
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:444)
> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobSer

Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-04 Thread Guowei Ma
Hi Yan
After a second thought I think you are right, the downstream operator
should keep the order of the same key from the same upstream. So feel free
to open a jira.
Best,
Guowei


On Wed, Nov 3, 2021 at 7:30 PM Yan Shen  wrote:

> Hi,
>
> It will complicate things a lot if we cannot assume input order of any
> operator after a keyBy. So far I only have the problem with countWindow
> which I seem to be able to avoid by writing my own stateful KeyedProcess.
> Are there other operators which might cause the same problem?
>
> The other alternative is not to use batch mode, but the problem is that I
> wont know when a batch job finishes if I don't run it in batch mode since a
> streaming process will never end.
>
> Thanks.
>
> On Wed, Nov 3, 2021 at 4:38 PM Guowei Ma  wrote:
>
>> Hi, Yan
>> I do not think it is a bug. Maybe we could not assume the input's order
>> of an operator simply.
>> Best,
>> Guowei
>>
>>
>> On Wed, Nov 3, 2021 at 3:10 PM Yan Shen  wrote:
>>
>>> Yes, it does not happen in streaming mode. Is this considered a bug or
>>> is it by design?
>>>
>>> Thanks!
>>>
>>> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma  wrote:
>>>
>>>> Hi
>>>>
>>>> I did not run your program directly, but I see that you are now using
>>>> the Batch execution mode. I suspect it is related to this, because in the
>>>> Batch execution mode FLINK will "sort" the Key (this might be an unstable
>>>> sort).
>>>> So would you like to experiment with the results of running with
>>>> Streaming mode and to see what happens?
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen  wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Can anyone advise on this?
>>>>>
>>>>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>>>>
>>>>> package aero.airlab.flinkjobs.headingreminder
>>>>>
>>>>> import org.apache.flink.api.common.RuntimeExecutionMode
>>>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>>>>> import 
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>>>> import kotlin.random.Random
>>>>>
>>>>> object CountWindowTest {
>>>>> @JvmStatic
>>>>> fun main(args: Array) {
>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>> env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>>>>
>>>>> val rand = Random(0)
>>>>> val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>>>>> env.fromCollection(data).assignTimestampsAndWatermarks(
>>>>> WatermarkStrategy.forMonotonousTimestamps>()
>>>>> .withTimestampAssigner { e, _ -> e.second.toLong() })
>>>>> .keyBy { it.first }
>>>>> .countWindow(3L, 1)
>>>>> .reduce { a, b -> b }
>>>>> .keyBy { it.first }
>>>>> .filter { it.first == 5 }
>>>>> .print()
>>>>>
>>>>> env.execute()
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> The beginning of the output is as such
>>>>>
>>>>> 12> (5, 184)
>>>>> 12> (5, 18)
>>>>> 12> (5, 29)
>>>>> 12> (5, 37)
>>>>> 12> (5, 38)
>>>>> 12> (5, 112)
>>>>> 12> (5, 131)
>>>>>
>>>>> The first line (5, 184) is not in order from the rest.
>>>>>
>>>>> Is this a bug? The problem disappears if I remove the keyBy after the
>>>>> reduce.
>>>>>
>>>>> Thanks.
>>>>>
>>>>


Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Guowei Ma
Hi, Yan
I do not think it is a bug. Maybe we could not assume the input's order of
an operator simply.
Best,
Guowei


On Wed, Nov 3, 2021 at 3:10 PM Yan Shen  wrote:

> Yes, it does not happen in streaming mode. Is this considered a bug or is
> it by design?
>
> Thanks!
>
> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma  wrote:
>
>> Hi
>>
>> I did not run your program directly, but I see that you are now using the
>> Batch execution mode. I suspect it is related to this, because in the Batch
>> execution mode FLINK will "sort" the Key (this might be an unstable sort).
>> So would you like to experiment with the results of running with
>> Streaming mode and to see what happens?
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen  wrote:
>>
>>> Hi all,
>>>
>>> Can anyone advise on this?
>>>
>>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>>
>>> package aero.airlab.flinkjobs.headingreminder
>>>
>>> import org.apache.flink.api.common.RuntimeExecutionMode
>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import kotlin.random.Random
>>>
>>> object CountWindowTest {
>>> @JvmStatic
>>> fun main(args: Array) {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment()
>>> env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>>
>>> val rand = Random(0)
>>> val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>>> env.fromCollection(data).assignTimestampsAndWatermarks(
>>> WatermarkStrategy.forMonotonousTimestamps>()
>>> .withTimestampAssigner { e, _ -> e.second.toLong() })
>>> .keyBy { it.first }
>>> .countWindow(3L, 1)
>>> .reduce { a, b -> b }
>>> .keyBy { it.first }
>>> .filter { it.first == 5 }
>>> .print()
>>>
>>> env.execute()
>>> }
>>> }
>>>
>>>
>>> The beginning of the output is as such
>>>
>>> 12> (5, 184)
>>> 12> (5, 18)
>>> 12> (5, 29)
>>> 12> (5, 37)
>>> 12> (5, 38)
>>> 12> (5, 112)
>>> 12> (5, 131)
>>>
>>> The first line (5, 184) is not in order from the rest.
>>>
>>> Is this a bug? The problem disappears if I remove the keyBy after the
>>> reduce.
>>>
>>> Thanks.
>>>
>>


Re: Question on BoundedOutOfOrderness

2021-11-03 Thread Guowei Ma
Hi Oliver

I think Alexey is right that you could not assume that the record would be
output in the event time order.
And there is a small addition.I see your output and there are actually
multiple concurrencies (probably 11 subtasks). You also can't expect these
concurrencies to be ordered according to event time.

Best,
Guowei


On Wed, Nov 3, 2021 at 6:46 AM Alexey Trenikhun  wrote:

> Hi Oliver,
> I believe you also need to do sort, out of order ness watermark strategy
> only “postpone” watermark for given expected maximum of out of orderness.
> Check Ververica example -
> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java
>
> Alexey
> --
> *From:* Oliver Moser 
> *Sent:* Tuesday, November 2, 2021 12:52:59 PM
> *To:* user@flink.apache.org 
> *Subject:* Question on BoundedOutOfOrderness
>
> Hi!
>
> I am investigating the use of Flink for a new project and started some
> simple demos.
>
> Currently I am stuck at the point where I need to deal with events
> arriving out of order based on their event time. I’ve spent quite some time
> researching on SO, the docs, the Ververica training (excellent resource
> btw), however, I assume I still run into some conceptual misconceptions :-)
>
> I put together the following demo code, and I would expect that the
> console output would list the events chronologically based on their
> embedded event time. However, events are always printed in the same order
> as they are pushed into the data stream by the OutOfOrderEventSource.
>
> Sample console output:
>
> —
> 3> EventRecord{id=3, counter=1, eventId=0,
> timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1,
> timestamp=2021-11-02T20:10:02.810}
> 5> EventRecord{id=0, counter=3, eventId=0,
> timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1,
> timestamp=2021-11-02T20:10:00.815}
> 7> EventRecord{id=1, counter=5, eventId=1,
> timestamp=2021-11-02T20:10:05.819}
> 8> EventRecord{id=4, counter=6, eventId=0,
> timestamp=2021-11-02T20:10:04.819}
> 9> EventRecord{id=0, counter=7, eventId=1,
> timestamp=2021-11-02T20:10:03.824}
> 10> EventRecord{id=0, counter=8, eventId=1,
> timestamp=2021-11-02T20:10:05.828}
> 11> EventRecord{id=3, counter=9, eventId=1,
> timestamp=2021-11-02T20:10:09.829}
> —
>
> My expectation would be to receive the events ordered:
>
> —
> 5> EventRecord{id=0, counter=3, eventId=0,
> timestamp=2021-11-02T20:09:59.815}
> 6> EventRecord{id=4, counter=4, eventId=1,
> timestamp=2021-11-02T20:10:00.815}
> 3> EventRecord{id=3, counter=1, eventId=0,
> timestamp=2021-11-02T20:10:01.554}
> 4> EventRecord{id=2, counter=2, eventId=1,
> timestamp=2021-11-02T20:10:02.810}
> 9> EventRecord{id=0, counter=7, eventId=1,
> timestamp=2021-11-02T20:10:03.824}
> …
> —
>
>
> Given a BoundedOutOfOrderness watermarking strategy with a 20 seconds
> duration, my expectation would have been that for the first event that is
> pushed to the demo source
>
> EventRecord{id=3, counter=1, eventId=0, timestamp=2021-11-02T20:10:01.554}
>
> this would set the initial watermark to "2021-11-02T20:09:41.554”, hence
> events that are older than this timestamp are not considered, but events
> younger than this timestamps are considered and ordering of events happens
> accordingly. That would bean that
>
> EventRecord{id=0, counter=3, eventId=0, timestamp=2021-11-02T20:09:59.815}
>
> would still be considered on time.
>
> I’m sure I am missing something conceptually.
>
> Here is the code that I’m using:
>
> ---
>
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> import java.time.Duration;
> import java.time.LocalDateTime;
> import java.time.ZoneOffset;
> import java.time.temporal.ChronoUnit;
> import java.util.Random;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import static java.time.Instant.ofEpochMilli;
> import static java.time.LocalDateTime.ofInstant;
>
> public class SimpleOutOfOrderDemo {
>
>public static void main(String... args) throws Exception {
>   var env = StreamExecutionEnvironment.getExecutionEnvironment();
>   var watermarkStrategy =
>  WatermarkStrategy.forGenerator(
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
> .withTimestampAssigner((e, rt) -> e.timestamp);
>
>   var source = new OutOfOrderEventSource();
>
> env.addSource(source).assignTimestampsAndWatermarks(watermarkStrategy).print();
>   env.execute("Simple Out of Order Demo");
>}
>
>public static class OutOfOrderEventSource implements
> SourceFunction {
>
>   static final int MAX_ELEMENTS = 10;
>
>   static final long INTERVAL = 1000;
>
>   AtomicInteger counter = new AtomicInteger();
>
>   @Override
>   public void 

Re: Why do the count windows in Flink Table APIs require processing time for sorting whereas in Flink Datastream APIs they do not

2021-11-03 Thread Guowei Ma
Hi Long

>From the API point of view, this processing time can be omitted. This is
mainly for unification: event-time scenarios, and alignment
with other window APIs.

Thanks Jark Wu  for telling me this offline.

Best,
Guowei


On Wed, Nov 3, 2021 at 11:55 AM Long Nguyễn 
wrote:

> I have read about the Window operator
> 
> in Flink documentation and know that it groups rows into finite groups
> based on time or row-count intervals.
>
> I saw an example of a sliding count window right there
> 
> :
>
> // Sliding Row-count window (assuming a processing-time attribute 
> "proctime").window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));
>
> As mentioned in the docs, the on method here is to define:
>
>> The time attribute to group (time interval) or sort (row count) on. For
>> batch queries this might be any Long or Timestamp attribute. For streaming
>> queries this must be a declared event-time or processing-time time
>> attribute.
>
>
> On the other hand, I searched found this countWindow
> 
>  method
> in Flink's Java docs and saw that it does not specify any time-related
> parameter.
> I'm wondering why a sliding count window in Flink Table APIs requires
> processing time whereas it is unnecessary in the Datastream APIs.
>
> I really appreciate it if someone can clarify this for me.
>
> --
> 
> --
> Nguyen Dich Long,
> School of Information and Communication Technology (SoICT -
> https://www.soict.hust.edu.vn)
> Hanoi University of Science and Technology (https://www.hust.edu.vn)
> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha
> Noi, Vietnam
> Tel: +84 (0)3.54.41.76.76
> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com
>


Re: Is there a way to update checkpoint configuration for a job "in-place"?

2021-11-03 Thread Guowei Ma
Hi Kevin
If you want to change this configuration(execution.checkpointing.timeout)
without restarting the job, as far as I know, there may not be such a
method.
But could you consider increasing this value by default?

Best,
Guowei


On Wed, Nov 3, 2021 at 5:15 AM Kevin Lam  wrote:

> Hi all,
>
> We run a Flink application on Kubernetes in Application Mode using Kafka
> with exactly-once-semantics and high availability.
>
> We are looking into a specific failure scenario: a flink job that has too
> short a checkpoint timeout (execution.checkpointing.timeout) and at some
> point during the job's execution, checkpoints begin to fail.
>
> Is there a way to update the checkpoint timeout
> (execution.checkpointing.timeout) of this job, in-place ie. without
> creating a new job, or restoring from an old savepoint/checkpoint? Note:
> one idea may be to take a savepoint, and then restore from that savepoint
> with the new configuration, however this is not possible because if
> checkpoints are timing out, so are savepoints and thus save points cannot
> be taken. Are there any other ways to handle this situation?
>
> We want to ensure exactly-once semantics are respected.
>
> Thanks in advance!
>


Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread Guowei Ma
Thank Daisy& Kevin much for your introduction to the improvement of TM
blocking shuffle, credit base+io scheduling is indeed a very interesting
thing. At the same time, I look forward to this as a default setting for tm
blocking shuffle.

Best,
Guowei


On Wed, Nov 3, 2021 at 2:46 PM Gen Luo  wrote:

> Thanks Daisy and Kevin! The benchmark results look really exciting!
>
> On Tue, Nov 2, 2021 at 4:38 PM David Morávek  wrote:
>
>> Thanks Daisy and Kevin for a great write up! ;) Especially the 2nd part
>> was really interesting, I really like the idea of the single spill file
>> with a custom scheduling of read requests.
>>
>> Best,
>> D.
>>
>> On Mon, Nov 1, 2021 at 10:01 AM Daisy Tsang  wrote:
>>
>>> Hey everyone, we have a new two-part post published on the Apache Flink
>>> blog about the sort-based blocking shuffle implementation in Flink.  It
>>> covers benchmark results, design and implementation details, and more!  We
>>> hope you like it and welcome any sort of feedback on it. :)
>>>
>>>
>>> https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
>>> https://flink.apache.org/2021/10/26/sort-shuffle-part2.html
>>>
>>


Re: What is Could not retrieve file from transient blob store?

2021-11-03 Thread Guowei Ma
Hi, Smith

It seems that the log file(blob_t-274d3c2d5acd78ced877d898b1877b10b62a64df-
590b54325d599a6782a77413691e0a7b) is deleted for some reason. But AFAIK
there are no other guys reporting this exception.(Maybe other guys know
what would happen).
1. I think if you could refresh the page and you would see the correct
result because this would trigger another file retrieving from TM.
2. And It might be more safe that setting an dedicated blob
directory path(other than /tmp) `blob.storage.directory`[1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#blob-storage-directory


Best,
Guowei


On Wed, Nov 3, 2021 at 7:50 AM John Smith  wrote:

> Hi running Flink 1.10.0 With 3 zookeepers, 3 job nodes and 3 task nodes.
> and I saw this exception on the job node logs...
> 2021-11-02 23:20:22,703 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerLogFileHandler - Unhandled exception.
> org.apache.flink.util.FlinkException: Could not retrieve file from
> transient blob store.
> at org.apache.flink.runtime.rest.handler.taskmanager.
> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
> AbstractTaskManagerFileHandler.java:135)
> at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> .java:670)
> at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> CompletableFuture.java:646)
> at java.util.concurrent.CompletableFuture$Completion.run(
> CompletableFuture.java:456)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
> .run(NioEventLoop.java:515)
> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at org.apache.flink.shaded.netty4.io.netty.util.internal.
> ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: Local file /tmp/blobStore-9
> cb73f27-11db-4c42-a3fc-9b77f558e722/no_job/blob_t-274d3
> c2d5acd78ced877d898b1877b10b62a64df-590b54325d599a6782a77413691e0a7b does
> not exist and failed to copy from blob store.
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:516)
> at org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer
> .java:444)
> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:
> 369)
> at org.apache.flink.runtime.rest.handler.taskmanager.
> AbstractTaskManagerFileHandler.lambda$respondToRequest$0(
> AbstractTaskManagerFileHandler.java:133)
> ... 9 more
>


Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-02 Thread Guowei Ma
Hi

I did not run your program directly, but I see that you are now using the
Batch execution mode. I suspect it is related to this, because in the Batch
execution mode FLINK will "sort" the Key (this might be an unstable sort).
So would you like to experiment with the results of running with Streaming
mode and to see what happens?

Best,
Guowei


On Wed, Nov 3, 2021 at 12:16 AM Yan Shen  wrote:

> Hi all,
>
> Can anyone advise on this?
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
>
> package aero.airlab.flinkjobs.headingreminder
>
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Random
>
> object CountWindowTest {
> @JvmStatic
> fun main(args: Array) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment()
> env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>
> val rand = Random(0)
> val data = (0..1000).map { Pair(rand.nextInt(10), it) }
> env.fromCollection(data).assignTimestampsAndWatermarks(
> WatermarkStrategy.forMonotonousTimestamps>()
> .withTimestampAssigner { e, _ -> e.second.toLong() })
> .keyBy { it.first }
> .countWindow(3L, 1)
> .reduce { a, b -> b }
> .keyBy { it.first }
> .filter { it.first == 5 }
> .print()
>
> env.execute()
> }
> }
>
>
> The beginning of the output is as such
>
> 12> (5, 184)
> 12> (5, 18)
> 12> (5, 29)
> 12> (5, 37)
> 12> (5, 38)
> 12> (5, 112)
> 12> (5, 131)
>
> The first line (5, 184) is not in order from the rest.
>
> Is this a bug? The problem disappears if I remove the keyBy after the
> reduce.
>
> Thanks.
>


Re: Flink sink data to DB and then commit data to Kafka

2021-11-02 Thread Guowei Ma
Hi, Qihua

AFAIK there is no way to do it. Maybe you need to implement a "new" sink to
archive this target.

Best,
Guowei


On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:

> Hi,
>
> Our flink application has two sinks(DB and kafka topic). We want to push
> same data to both sinks. Is it possible to push data to kafka topic only
> after data is pushed to DB successfully? If the commit to DB fail, we don't
> want those data is pushed to kafka.
>
> Thanks,
> Qihua
>


Re: Write Streaming data to S3 in Parquet files

2021-09-27 Thread Guowei Ma
Hi,Harshvardhan

I think you could use some factory such as `ParquetAvroWriters.for`
form `ParquetAvroWriters.java` [1].
And you could see more same class in the package
`org.apache.flink.formats.parquet.`

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java

Best,
Guowei


On Mon, Sep 27, 2021 at 2:36 AM Harshvardhan Shinde <
harshvardhan.shi...@oyorooms.com> wrote:

> Hi,
>
> Thanks for the response.
>
> How can this streaming data be written to S3 for the path to be given?
> Also I see that the FileSink takes GenericRecord, so how can the
> DataStream be converted to a GenericRecord?
>
> Please bear with me if my questions don't make any sense.
>
> On Sun, Sep 26, 2021 at 9:12 AM Guowei Ma  wrote:
>
>> Hi, Harshvardhan
>>
>> I think CaiZhi is right.
>> I only have a small addition. Because I see that you want to convert
>> Table to DataStream, you can look at FileSink (ParquetWriterFactory)[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/#bulk-encoded-formats
>>
>> Best,
>> Guowei
>>
>>
>> On Sun, Sep 26, 2021 at 10:31 AM Caizhi Weng 
>> wrote:
>>
>>> Hi!
>>>
>>> Try the PARTITIONED BY clause. See
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/
>>>
>>> Harshvardhan Shinde  于2021年9月24日周五
>>> 下午5:52写道:
>>>
>>>> Hi,
>>>> I wanted to know if we can write streaming data to S3 in parquet format
>>>> with partitioning.
>>>> Here's what I want to achieve:
>>>> I have a kafka table which gets updated with the data from kafka topic
>>>> and I'm using select statement to get the data into a Table and converting
>>>> into a stream as:
>>>>
>>>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>>> Table table = tableEnv.sqlQuery("Select * from test");
>>>> DataStream stream = tableEnv.toDataStream(table);
>>>>
>>>> Now I want to write this stream to S3 in parquet files with hourly
>>>> partitions.
>>>>
>>>> Here are my questions:
>>>> 1. Is this possible?
>>>> 2. If yes, how it can be achieved or link to appropriate documentation.
>>>>
>>>> Thanks and Regards,
>>>> Harshvardhan
>>>>
>>>>
>
> --
> Thanks and Regards,
> Harshvardhan
> Data Platform
>


Re: Write Streaming data to S3 in Parquet files

2021-09-25 Thread Guowei Ma
Hi, Harshvardhan

I think CaiZhi is right.
I only have a small addition. Because I see that you want to convert Table
to DataStream, you can look at FileSink (ParquetWriterFactory)[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/file_sink/#bulk-encoded-formats

Best,
Guowei


On Sun, Sep 26, 2021 at 10:31 AM Caizhi Weng  wrote:

> Hi!
>
> Try the PARTITIONED BY clause. See
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/formats/parquet/
>
> Harshvardhan Shinde  于2021年9月24日周五
> 下午5:52写道:
>
>> Hi,
>> I wanted to know if we can write streaming data to S3 in parquet format
>> with partitioning.
>> Here's what I want to achieve:
>> I have a kafka table which gets updated with the data from kafka topic
>> and I'm using select statement to get the data into a Table and converting
>> into a stream as:
>>
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>> Table table = tableEnv.sqlQuery("Select * from test");
>> DataStream stream = tableEnv.toDataStream(table);
>>
>> Now I want to write this stream to S3 in parquet files with hourly
>> partitions.
>>
>> Here are my questions:
>> 1. Is this possible?
>> 2. If yes, how it can be achieved or link to appropriate documentation.
>>
>> Thanks and Regards,
>> Harshvardhan
>>
>>


Re: Job Manager went down on cancelling job with savepoint

2021-09-25 Thread Guowei Ma
Hi, Puneet

Could you share whether you are using Flink's session mode or application
mode?
>From the log, you are using `StandaloneDispatcher`, but you will use it in
both session and application mode.
If you use application mode, this might be in line with expectations.

Best,
Guowei


On Fri, Sep 24, 2021 at 9:19 PM Puneet Duggal 
wrote:

> Hi,
>
> So while cancelling one job with savepoint… even though job got cancelled
> successfully .. but somehow immediately after that job manager went down.
> Not able to deduce anything from given stack trace.. Any help is appreciated
>
> 2021-09-24 11:50:44,182 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping
> checkpoint coordinator for job 1f764a51996d206b28721aa4a1420bea.
> 2021-09-24 11:50:44,182 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Shutting down
> 2021-09-24 11:50:44,240 INFO
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore [] - Removing
> /flink/default_ns/checkpoints/1f764a51996d206b28721aa4a1420bea from
> ZooKeeper
> 2021-09-24 11:50:44,243 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter [] -
> Shutting down.
> 2021-09-24 11:50:44,243 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter [] -
> Removing /checkpoint-counter/1f764a51996d206b28721aa4a1420bea from ZooKeeper
> 2021-09-24 11:50:44,249 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
> 1f764a51996d206b28721aa4a1420bea reached globally terminal state CANCELED.
> 2021-09-24 11:50:44,249 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
> Thread 'cluster-io-thread-16' produced an uncaught exception. Stopping the
> process...
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@54a5137c
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@37ee0790[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 4513]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
> ~[?:1.8.0_232]
> at
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_232]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
>
> Regards,
> Puneet
>
>
>


Re: stream processing savepoints and watermarks question

2021-09-24 Thread Guowei Ma
Hi, JING

Thanks for the case.
But I am not sure this would happen. As far as I know the event timer could
only be triggered when there is a watermark (except the "quiesce phase").
I think it could not advance any watermarks after MAX_WATERMARK is received.

Best,
Guowei


On Fri, Sep 24, 2021 at 4:31 PM JING ZHANG  wrote:

> Hi Guowei,
> I could provide a case that I have encountered which timers to fire
> indefinitely when doing drain savepoint.
> After an event timer is triggered, it registers another event timer
> whose value equals the value of triggered timer plus an interval time.
> If a MAX_WATERMARK comes, the timer is triggered, then registers another
> timer and forever.
> I'm not sure whether Macro meets a similar problem.
>
> Best,
> JING ZHANG
>
>
>
> Guowei Ma  于2021年9月24日周五 下午4:01写道:
>
>> Hi Macro
>>
>> Indeed, as mentioned by JING, if you want to drain when triggering
>> savepoint, you will encounter this MAX_WATERMARK.
>> But I have a problem. In theory, even with MAX_WATERMARK, there will not
>> be an infinite number of timers. And these timers should be generated by
>> the application code.
>> You can share your code if it is convenient for you.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG  wrote:
>>
>>> Hi Macro,
>>> Do you specified drain flag when stop a job with a savepoint?
>>> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
>>> before the last checkpoint barrier.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Marco Villalobos  于2021年9月24日周五 下午12:54写道:
>>>
>>>> Something strange happened today.
>>>> When we tried to shutdown a job with a savepoint, the watermarks became
>>>> equal to 2^63 - 1.
>>>>
>>>> This caused timers to fire indefinitely and crash downstream systems
>>>> with overloaded untrue data.
>>>>
>>>> We are using event time processing with Kafka as our source.
>>>>
>>>> It seems impossible for a watermark to be that large.
>>>>
>>>> I know its possible stream with a batch execution mode.  But this was
>>>> stream processing.
>>>>
>>>> What can cause this?  Is this normal behavior when creating a savepoint?
>>>>
>>>


Re: stream processing savepoints and watermarks question

2021-09-24 Thread Guowei Ma
Hi Macro

Indeed, as mentioned by JING, if you want to drain when triggering
savepoint, you will encounter this MAX_WATERMARK.
But I have a problem. In theory, even with MAX_WATERMARK, there will not be
an infinite number of timers. And these timers should be generated by the
application code.
You can share your code if it is convenient for you.

Best,
Guowei


On Fri, Sep 24, 2021 at 2:02 PM JING ZHANG  wrote:

> Hi Macro,
> Do you specified drain flag when stop a job with a savepoint?
> If the --drain flag is specified, then a MAX_WATERMARK will be emitted
> before the last checkpoint barrier.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>
> Best,
> JING ZHANG
>
> Marco Villalobos  于2021年9月24日周五 下午12:54写道:
>
>> Something strange happened today.
>> When we tried to shutdown a job with a savepoint, the watermarks became
>> equal to 2^63 - 1.
>>
>> This caused timers to fire indefinitely and crash downstream systems with
>> overloaded untrue data.
>>
>> We are using event time processing with Kafka as our source.
>>
>> It seems impossible for a watermark to be that large.
>>
>> I know its possible stream with a batch execution mode.  But this was
>> stream processing.
>>
>> What can cause this?  Is this normal behavior when creating a savepoint?
>>
>


Re: Exact S3 Permissions to allow a flink job to use s3 for checkpointing

2021-09-24 Thread Guowei Ma
Hi, Thomas

I am not an expert of s3 but I think Flinkneed write/read/delete(maybe
list) permission of the path(bucket).
BTW, What error did you encounter?

Best,
Guowei


On Fri, Sep 24, 2021 at 5:00 AM Thomas Wang  wrote:

> Hi,
>
> I'm trying to figure out what exact s3 permissions does a flink job need
> to work appropriately when using s3 for checkpointing. Currently, I have
> the following IAM Policy, but it seems insufficient. Can anyone help me
> figure this out? Thanks.
>
> {
> Action = [
> "s3:PutObject",
> "s3:GetObject",
> ]
> Effect = "Allow"
> Resource = "arn:aws:s3::://*"
> },
>
> Thomas
>


Re: byte array as keys in Flink

2021-09-24 Thread Guowei Ma
Hi Hill

As far as I know you could not use byte[] as a keyby. You could find more
information from [1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#keyby

Best,
Guowei


On Fri, Sep 24, 2021 at 3:15 PM Caizhi Weng  wrote:

> Hi!
>
> It depends on the state backend you use. For example if you use a heap
> memory state backend it is backed by a hash map and it uses the hash code
> of byte[] to compare the two byte[] (see HeapMapState#put). However for
> rocksdb state backend it uses the serialized bytes (that is to say, the
> content of byte[]) to compare with the records and thus two byte[] with the
> same content can match (see RocksDBMapState#put).
>
> Dan Hill  于2021年9月24日周五 上午7:43写道:
>
>> *Context*
>> I want to perform joins based on UUIDs.  String version is less efficient
>> so I figured I should use the byte[] version.  I did a shallow dive into
>> the Flink code I'm not sure it's safe to use byte[] as a key (since it uses
>> object equals/hashcode).
>>
>> *Request*
>> How do other Flink devs do for byte[] keys? I want to use byte[] as a key
>> in a MapState.
>>
>>
>>


Re: Flink Native Kubernetes - Configuration kubernetes.flink.log.dir not working

2021-09-14 Thread Guowei Ma
Hi

Maybe you could try the `kubectl describe pod -n ${namespace} ${podname}`
to see what happened atm.

Best,
Guowei


On Tue, Sep 14, 2021 at 2:58 PM bat man  wrote:

> Hello Guowei,
>
> The pods terminate almost within a second so am unable to pull any logs.
> Is there any way I can pull the logs?
>
> Thanks,
> Hemant
>
> On Tue, Sep 14, 2021 at 12:22 PM Guowei Ma  wrote:
>
>> Hi,
>>
>> Could you share some logs when the job fails?
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Sep 13, 2021 at 10:59 PM bat man  wrote:
>>
>>> Hi,
>>>
>>> I am running a POC to evaluate Flink on Native Kubernetes. I tried
>>> changing the default log location by using the configuration -
>>> kubernetes.flink.log.dir
>>> However, the job in application mode fails after bringing up the task
>>> manager. This is the command I use -
>>>
>>> ./bin/flink run-application --target kubernetes-application
>>> -Dkubernetes.cluster-id=flink-k8s-poc-app
>>> -Dkubernetes.container.image=
>>> -Dkubernetes.flink.log.dir="/var/log/containers"
>>> local:///opt/flink/usrlib/uber.jar
>>>
>>> Is there something else which needs to be done to write logs to a
>>> different location like creating the folders in the custom image.
>>>
>>> Thanks.
>>>
>>


Re: JVM Metaspace capacity planning

2021-09-14 Thread Guowei Ma
Hi, Puneet
In general every job  has its own classloader. You could find more detailed
information from doc [1].
You could put some common jar into the "/lib" to avoid this [2].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code

Best,
Guowei


On Mon, Sep 13, 2021 at 10:06 PM Puneet Duggal 
wrote:

> Hi,
>
> Thank you for quick reply. So in my case i am using Datastream Apis.Each
> job is a real time processing engine which consumes data from kafka and
> performs some processing on top of it before ingesting into sink.
>
> JVM Metaspace size earlier set was around 256MB (default) which i had to
> increase to 3GB so that ~30 parallel jobs can run simultaneously on single
> task manager.
>
> Regards,
> Puneet
>
> On 13-Sep-2021, at 5:46 PM, Caizhi Weng  wrote:
>
> Hi!
>
> Which API are you using? The datastream API or the Table / SQL API? If it
> is the Table / SQL API then some Java classes for some operators (for
> example aggregations, projection, filter, etc.) will be generated when
> compiling user code to executable Java code. These Java classes are new to
> the JVM. So if you're running too many jobs in the same Flink cluster a
> metaspace OOM might occur. There is already a JIRA ticket for this [1].
>
> I don't know much about the behavior of class loaders, so I'll wait for
> others to apply in this aspect.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15024
>
> Puneet Duggal  于2021年9月13日周一 下午7:49写道:
>
>> Hi,
>>
>> So on going through multiple resources, got basic idea that JVM Metaspace
>> is used by flink class loader to load class metadata which is used to
>> create objects in heap. Also this is a one time activity since all the
>> objects of single class require single class metadata object in JVM
>> Metaspace.
>>
>> But while deploying multiple jobs on task manager, i saw almost linear
>> increase in consumption of metaspace (irrespective of parallelism). Even if
>> those multiple jobs have exactly same implementation. So wanted to confirm
>> if each job in flink has its own class loader which loads required classes
>> in Task Manager JVM Metaspace.
>>
>> PS: Any documentation for this will be of great help.
>>
>> Thanks,
>> Puneet
>
>
>


Re: Flink Native Kubernetes - Configuration kubernetes.flink.log.dir not working

2021-09-14 Thread Guowei Ma
Hi,

Could you share some logs when the job fails?

Best,
Guowei


On Mon, Sep 13, 2021 at 10:59 PM bat man  wrote:

> Hi,
>
> I am running a POC to evaluate Flink on Native Kubernetes. I tried
> changing the default log location by using the configuration -
> kubernetes.flink.log.dir
> However, the job in application mode fails after bringing up the task
> manager. This is the command I use -
>
> ./bin/flink run-application --target kubernetes-application
> -Dkubernetes.cluster-id=flink-k8s-poc-app
> -Dkubernetes.container.image=
> -Dkubernetes.flink.log.dir="/var/log/containers"
> local:///opt/flink/usrlib/uber.jar
>
> Is there something else which needs to be done to write logs to a
> different location like creating the folders in the custom image.
>
> Thanks.
>


Re: Flink Task/Operator metrics renaming

2021-09-13 Thread Guowei Ma
Hi, Ashutosh

As far as I know, there is no way to rename the system metrics name.
But would you like to share why you need to rename the metrics ?

Best,
Guowei


On Mon, Sep 13, 2021 at 2:29 PM Ashutosh Uttam 
wrote:

> Hi team,
>
> We are using PrometheusReporter to expose Flink metrics to Prometheus. Is
> there possibility of renaming Task/Operators metric like numRecordsIn,
> numRecordsOut etc. and exposing it to Prometheus.
>
> Regards,
> Ashutosh
>


Re: TaskManagers OOM'ing for Flink App with very large state only when restoring from checkpoint

2021-09-13 Thread Guowei Ma
Hi, Kevin

1. Could you give me some specific information, such as what version of
Flink is you using, and is it using DataStream or SQL?
2. As far as I know, RocksDB will put state on disk, so it will not consume
memory all the time and cause OOM in theory.
So you can see if there are any object leaks by analyzing the Jmap of
TaskManger after Failover.
3. There is another way, you can trigger a save point first, and then
resume the job from the save point to see if there is still OOM,
 if not, then it is likely to be related to your application code.

Best,
Guowei


On Sat, Sep 11, 2021 at 2:01 AM Kevin Lam  wrote:

> Hi all,
>
> We've seen scenarios where TaskManagers will begin to OOM, shortly after a
> job restore from checkpoint. Our flink app has a very large state (100s of
> GB) and we use RocksDB as a backend.
>
> Our repro is something like this: run the job for an hour and let it
> accumulate state, kill a task manager. The job restores properly, but then
> minutes later task managers begin to be killed on K8S due to OOM, and this
> causes a degenerate state where the job restores and new OOMs cause the job
> to restore again and it never recovers.
>
> We've tried increasing the TaskManager memory (doubled), and observed that
> OOMs still happen even when the allocated k8s container memory is not maxed
> out.
>
> Can you shed some light on what happens during a restore process? How are
> checkpoints loaded, and how does this affect the memory pressure of task
> managers (that for eg. have had a task running, got it cancelled, and
> re-assigned a new task as part of restore)?
>
> Any help is appreciated!
>
>


Re: Triggers for windowed aggregations in Table API

2021-09-06 Thread Guowei Ma
Hi, John

If you want to trigger the aggregation calculation of the window “earlier”
I think you might need to define your customized window trigger in
DataStream yourself.
I think you could find more detailed information from the doc [1]

If you think that the customize the trigger is a little harder I think you
could use the `PorcessFunction` in DataStream.
You could extend this `ProcessFunction` to simulate early trigger window
behavior. There is a good example[2] in the doc.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example

Best,
Guowei


On Sat, Sep 4, 2021 at 2:16 AM John Smith 
wrote:

> Thanks Guowei and Caizhi.
> As Guowei noted, I am using Table API and it seems that it does not
> support triggers at the moment. Is there a plan to support custom triggers
> in Table API/SQL too?
> Also, if I follow Guowei's suggestion, should I use DataStream for other
> parts of the aggregate computation too or is there a way to create a
> GroupedWindowedTable from the DataStream?
>
> Thanks,
>
> On Thu, Sep 2, 2021 at 9:24 PM Guowei Ma  wrote:
>
>> Hi, John
>>
>> I agree with Caizhi that you might need to customize a window trigger.
>> But there is a small addition, you need to convert Table to DataStream
>> first.
>> Then you can customize the trigger of the window. Because as far as I
>> know, Table API does not support custom windows yet. For details on how to
>> convert, you can refer to [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> You might want to use your custom trigger to achieve this.
>>>
>>> Tumble windows are using EventTimeTrigger by default. Flink has another
>>> built-in trigger called CountTrigger but it only fires for every X records,
>>> ignoring the event time completely. You might want to create your own
>>> trigger to combine the two, or more specifically, combine
>>> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>>>
>>> For more about custom triggers see
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
>>>
>>> John Smith  于2021年9月3日周五 上午2:00写道:
>>>
>>>> Hi,
>>>>
>>>> Sorry if this has been answered previously but I couldn't find any
>>>> answer for the question and would appreciate any help.
>>>> Context:
>>>> Let's say I have a log stream in Kafka where message values have an
>>>> *id* field along with a few other fields. I want to count the number
>>>> of messages for each id for a tumbling window of* ten minutes *and if
>>>> the count for any id in the given window is higher than 5, I want to write
>>>> the message into the sink topic. However, I don't want to wait until the
>>>> end of the 10 minute window to emit the result and want to immediately emit
>>>> the result when the count is more than 5 for an id in the window.
>>>> For example, if I see 6 messages in the first minute for an id, I want to
>>>> trigger a write with the count of 6 in the sink topic immediately and not
>>>> wait the whole 10 minutes.
>>>> The following code does the aggregation but emits the results at the
>>>> end of the window. How can I trigger the emitting result earlier?
>>>>
>>>> final Table resultTable = sourceTable
>>>> .select( $("id")
>>>> , $("key")
>>>> 
>>>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>>>> .groupBy($("w"), $("id"))
>>>> .select($("w").start().as("WindowStart"), $("id"), 
>>>> $("key").count().as("count"))
>>>> ;
>>>>
>>>> resultTable.execute().print();
>>>>
>>>>
>>>> Thanks in advance!
>>>>
>>>>


Re: Triggers for windowed aggregations in Table API

2021-09-02 Thread Guowei Ma
Hi, John

I agree with Caizhi that you might need to customize a window trigger. But
there is a small addition, you need to convert Table to DataStream first.
Then you can customize the trigger of the window. Because as far as I know,
Table API does not support custom windows yet. For details on how to
convert, you can refer to [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration
Best,
Guowei


On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng  wrote:

> Hi!
>
> You might want to use your custom trigger to achieve this.
>
> Tumble windows are using EventTimeTrigger by default. Flink has another
> built-in trigger called CountTrigger but it only fires for every X records,
> ignoring the event time completely. You might want to create your own
> trigger to combine the two, or more specifically, combine
> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>
> For more about custom triggers see
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
>
> John Smith  于2021年9月3日周五 上午2:00写道:
>
>> Hi,
>>
>> Sorry if this has been answered previously but I couldn't find any answer
>> for the question and would appreciate any help.
>> Context:
>> Let's say I have a log stream in Kafka where message values have an *id*
>> field along with a few other fields. I want to count the number of messages
>> for each id for a tumbling window of* ten minutes *and if the count for
>> any id in the given window is higher than 5, I want to write the message
>> into the sink topic. However, I don't want to wait until the end of the 10
>> minute window to emit the result and want to immediately emit the result
>> when the count is more than 5 for an id in the window. For example, if I
>> see 6 messages in the first minute for an id, I want to trigger a write
>> with the count of 6 in the sink topic immediately and not wait the whole 10
>> minutes.
>> The following code does the aggregation but emits the results at the end
>> of the window. How can I trigger the emitting result earlier?
>>
>> final Table resultTable = sourceTable
>> .select( $("id")
>> , $("key")
>> 
>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>> .groupBy($("w"), $("id"))
>> .select($("w").start().as("WindowStart"), $("id"), 
>> $("key").count().as("count"))
>> ;
>>
>> resultTable.execute().print();
>>
>>
>> Thanks in advance!
>>
>>


Re: Flink on Kubernetes

2021-09-02 Thread Guowei Ma
Hi, Julian

I notice that your configuration
includes "restart-strategy.fixed-delay.attempts: 10". It means that the job
would fail after 10 times failure. So maybe it leads to the job not
restarting again and you could increase this value.
But I am not sure if this is the root cause. So if this does not work I
think you could share the log at that time and the flink version you use.

Best,
Guowei


On Fri, Sep 3, 2021 at 2:00 AM Julian Cardarelli  wrote:

> Hello –
>
>
>
> We have implemented Flink on Kubernetes with Google Cloud Storage in high
> availability configuration as per the below configmap. Everything appears
> to be working normally, state is being saved to GCS.
>
>
>
> However, every now and then – perhaps weekly or every other week, all of
> the submitted jobs are lost and the cluster appears completely reset.
> Perhaps GKE is doing maintenance or something of this nature, but the point
> being that the cluster does not resume from this activity in an operational
> state with all jobs placed into running status.
>
>
>
> Is there something we are missing? Thanks!
>
> -jc
>
>
>
>
>
> apiVersion: v1
>
> kind: ConfigMap
>
> metadata:
>
>   name: flink-config
>
>   labels:
>
> app: flink
>
> data:
>
>   flink-conf.yaml: |+
>
> jobmanager.rpc.address: flink-jobmanager
>
> taskmanager.numberOfTaskSlots: 1
>
> blob.server.port: 6124
>
> jobmanager.rpc.port: 6123
>
> taskmanager.rpc.port: 6122
>
> jobmanager.heap.size: 1024m
>
> taskmanager.memory.process.size: 1024m
>
> kubernetes.cluster-id: cluster1
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir: gs://
> storage-uswest.yyy.com/kubernetes-flink
>
> state.backend: filesystem
>
> state.checkpoints.dir: gs://
> storage-uswest.yyy.com/kubernetes-checkpoint
>
> state.savepoints.dir: gs://storage-uswest.yyy.com/kubernetes-savepoint
>
> execution.checkpointing.interval: 3min
>
> execution.checkpointing.externalized-checkpoint-retention:
> DELETE_ON_CANCELLATION
>
> execution.checkpointing.max-concurrent-checkpoints: 1
>
> execution.checkpointing.min-pause: 0
>
> execution.checkpointing.mode: EXACTLY_ONCE
>
> execution.checkpointing.timeout: 10min
>
> execution.checkpointing.tolerable-failed-checkpoints: 0
>
> execution.checkpointing.unaligned: false
>
> restart-strategy: fixed-delay
>
> restart-strategy.fixed-delay.attempts: 10
>
> restart-strategy.fixed-delay.delay 10s
>
>
>
>   log4j.properties: |+
>
> log4j.rootLogger=INFO, file
>
> log4j.logger.akka=INFO
>
> log4j.logger.org.apache.kafka=INFO
>
> log4j.logger.org.apache.hadoop=INFO
>
> log4j.logger.org.apache.zookeeper=INFO
>
> log4j.appender.file=org.apache.log4j.FileAppender
>
> log4j.appender.file.file=${log.file}
>
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>
>
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
> file
>
>
>
>
> ___​
> Julian   Cardarelli
> CEO
> T  *(800) 961-1549* <(800)%20961-1549>
> E *jul...@thentia.com* 
> *LinkedIn* 
> [image: Thentia Website]
> 
> DISCLAIMER
> ​
> ​Neither Thentia Corporation, nor its directors, officers, shareholders,
> representatives, employees, non-arms length companies, subsidiaries,
> parent, affiliated brands and/or agencies are licensed to provide legal
> advice. This e-mail may contain among other things legal information. We
> disclaim any and all responsibility for the content of this e-mail. YOU
> MUST NOT rely on any of our communications as legal advice. Only a licensed
> legal professional may give you advice. Our communications are never
> provided as legal advice, because we are not licensed to provide legal
> advice nor do we possess the knowledge, skills or capacity to provide legal
> advice. We disclaim any and all responsibility related to any action you
> might take based upon our communications and emphasize the need for you to
> never rely on our communications as the basis of any claim or proceeding.
>
> CONFIDENTIALITY
> ​
> ​This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error please notify the
> system manager. This message contains confidential information and is
> intended only for the individual(s) named. If you are not the named
> addressee(s) you should not disseminate, distribute or copy this e-mail.
> Please notify the sender immediately by e-mail if you have received this
> e-mail by mistake and delete this e-mail from your system. If you are not
> the intended recipient you are 

Re: Verifying max-parallelism value

2021-09-02 Thread Guowei Ma
Hi, Niklas

As far as I know, the maximum parallelism is not currently displayed on the
web ui. Maximum parallelism is the concept of operator granularity, so I
understand that it is a little difficult to show it. However, each job can
have its own default value, if not, there is a calculation method, see [1]
for details.

But if you only want to know the maximum parallelism of the operator after
the keyby, you can refer to restful api[2], this will return the
"maxParallelism" of each job vertex (actually the maximum parallelism of
the first "keyed"operator in the job vertex)
For example: http://localhost:8081/jobs/1eaf5e2ff65e199e4d8e8875882de7db

Hope it helpful.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/production_ready/#set-an-explicit-max-parallelism
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid

Best,
Guowei


On Fri, Sep 3, 2021 at 10:22 AM Caizhi Weng  wrote:

> Hi!
>
> Do you mean pipeline.max-parallelism or any other config options? If yes
> you should see it in the "Job Manager > Configuration" page of Flink UI.
>
> Which config option are you setting and how do you set that?
>
> Niklas Wilcke  于2021年9月3日周五 上午12:53写道:
>
>> Hi Flink community,
>>
>> most likely I'm missing something but I failed to verify the setting of
>> the max-parallelism (# key groups).
>> Is there a way to check the value for a job? I checked the following
>> places without finding it.
>>
>> 1. Flink UI: Job Configuration
>> 2. Flink UI: SubTasks of a Job
>> 3. Taskmanager Logs
>> 4. Exported Metrics (prom)
>>
>> I'm using Flink 1.12.2.
>>
>> It would be very nice to check that the value is properly set in
>> different environments. Maybe someone can help me out how I could do that.
>>
>> Thank you very much!
>>
>> Regards,
>> Niklas
>
>


Re: Odd Serialization exception

2021-08-12 Thread Guowei Ma
Hi,
I think you might be right. So you could try to call the super.open(...) in
your LogSessionProducer.
Best,
Guowei


On Thu, Aug 12, 2021 at 2:01 PM Daniel Vol  wrote:

> Hi Guowei,
>
> I am running on EMR 5.32.0 with Flink 1.11.2
>
> In meanwhile I did some tests and commented out part of the new code -
>
> override def invoke(value: OUT, context: SinkFunction.Context[_]): Unit = {
> try {
> //  val session = value.asInstanceOf[Session]
> //  sessionDuration = 17L //session.getSessionDuration
> //  sessionSize = 19 //session.getSessionTotalEvents
>   super.invoke(value, context)
>   sessionsWritten.inc()
> }
>
> Though I still get Caused by: org.apache.flink.util.SerializedThrowable:
> null
> So, my assumption is that something wrong with "override def open()" method
>
> Thanks!
>
> On Thu, Aug 12, 2021 at 8:44 AM Guowei Ma  wrote:
>
>> Hi, Daniel
>> Could you tell me the version of Flink you use? I want to look at the
>> corresponding code.
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol  wrote:
>>
>>> Hi Matthias,
>>>
>>> First, thanks for a fast reply.
>>> I am new to Flink, so probably I miss a lot in terms of flow and objects
>>> passed.
>>>
>>> The motivation is to get internal data from the transferred OUT Object
>>> to send metrics. So I do downscale it but as per my perspective it is not
>>> forwarded (super called with original value) or expected to be used in
>>> later steps (this expected to be a local scope variable)
>>> As I am suspect that you are right - can you point me to how can I get
>>> internal data from OUT without changing it or affecting next steps.
>>> As well - when I create the object - I specify OUT type (which is
>>> Session):
>>>
>>> val flinkKinesisProducer = new LogSessionProducer[*Session*](new 
>>> KinesisEventSerializer[Session], producerConfig)
>>>
>>> "… but of course I might be completely be mistaken due to incomplete
>>> information."
>>> What kind of information can I supply?
>>>
>>> Thanks a lot!
>>>
>>> Daniel
>>>
>>> On 11 Aug 2021, at 17:28, Schwalbe Matthias 
>>> wrote:
>>>
>>> 
>>>
>>> Hi Daniel,
>>>
>>>
>>>
>>> On the first look there is one thing that catches my eye:
>>>
>>> In line ‘val session = value.asInstanceOf[Session]' it looks like you
>>> are downcasting the event from OUT to Session.
>>>
>>> In Flink this is a dangerous thing to do … DataStream[OUT] uses a
>>> specific serializer[OUT] to transport events from one operator to the next
>>> (or at least from one task to the next, if configured this way).
>>>
>>> These serializers usually only understand one type, OUT in your case.
>>> Only in certain circumstances the java object (the event) is transported
>>> directly from one operator to the next.
>>>
>>>
>>>
>>> I guess this is what happened, you serializer that only understands OUT
>>> can not cope with a Session object …
>>>
>>>
>>>
>>> … but of course I might be completely be mistaken due to incomplete
>>> information.
>>>
>>>
>>>
>>> I hope this helps 
>>>
>>>
>>>
>>> Feel free to get back to me for clarifications (on the mailing list)
>>>
>>>
>>>
>>> Cheers
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Daniel Vol 
>>> *Sent:* Mittwoch, 11. August 2021 14:47
>>> *To:* user@flink.apache.org
>>> *Subject:* Odd Serialization exception
>>>
>>>
>>>
>>> I started to get the following exception:
>>>
>>>
>>>
>>> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(180),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8)] INFO
>>> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
>>> complete snapshot 134 for operator Window(EventTimeSessionWindows(180),
>>> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
>>> Unnamed (1/8). Failure reason: Checkpoint was declined.
>>> org.apache.flink.runtime.checkpoint.Checkpo

Re: Odd Serialization exception

2021-08-11 Thread Guowei Ma
Hi, Daniel
Could you tell me the version of Flink you use? I want to look at the
corresponding code.
Best,
Guowei


On Wed, Aug 11, 2021 at 11:23 PM Daniel Vol  wrote:

> Hi Matthias,
>
> First, thanks for a fast reply.
> I am new to Flink, so probably I miss a lot in terms of flow and objects
> passed.
>
> The motivation is to get internal data from the transferred OUT Object to
> send metrics. So I do downscale it but as per my perspective it is not
> forwarded (super called with original value) or expected to be used in
> later steps (this expected to be a local scope variable)
> As I am suspect that you are right - can you point me to how can I get
> internal data from OUT without changing it or affecting next steps.
> As well - when I create the object - I specify OUT type (which is Session):
>
> val flinkKinesisProducer = new LogSessionProducer[*Session*](new 
> KinesisEventSerializer[Session], producerConfig)
>
> "… but of course I might be completely be mistaken due to incomplete
> information."
> What kind of information can I supply?
>
> Thanks a lot!
>
> Daniel
>
> On 11 Aug 2021, at 17:28, Schwalbe Matthias 
> wrote:
>
> 
>
> Hi Daniel,
>
>
>
> On the first look there is one thing that catches my eye:
>
> In line ‘val session = value.asInstanceOf[Session]' it looks like you are
> downcasting the event from OUT to Session.
>
> In Flink this is a dangerous thing to do … DataStream[OUT] uses a specific
> serializer[OUT] to transport events from one operator to the next (or at
> least from one task to the next, if configured this way).
>
> These serializers usually only understand one type, OUT in your case. Only
> in certain circumstances the java object (the event) is transported
> directly from one operator to the next.
>
>
>
> I guess this is what happened, you serializer that only understands OUT
> can not cope with a Session object …
>
>
>
> … but of course I might be completely be mistaken due to incomplete
> information.
>
>
>
> I hope this helps 
>
>
>
> Feel free to get back to me for clarifications (on the mailing list)
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Daniel Vol 
> *Sent:* Mittwoch, 11. August 2021 14:47
> *To:* user@flink.apache.org
> *Subject:* Odd Serialization exception
>
>
>
> I started to get the following exception:
>
>
>
> 2021-08-11 09:45:30,299 [Window(EventTimeSessionWindows(180),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8)] INFO
> o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl  - Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(180),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 134 for operator Window(EventTimeSessionWindows(180),
> EventTimeTrigger, SessionAggregator, PassThroughWindowFunction) -> Sink:
> Unnamed (1/8). Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
> at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
> at org.apache.flink.streaming.runtime.io
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
> at org.apache.flink.streaming.runtime.io
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
> at org.apache.flink.streaming.runtime.io
> 

Re: Flink 1.4.1 randomically responds HTTP 500 when sending job to Job Manager

2021-06-25 Thread Guowei Ma
Hi Burcu
Could you show more logs? I could try to help find out what is happening.
But to be honest the 1.4 is too old a version that the community does not
support. You’d better upgrade to a newer version.
Best,
Guowei


On Fri, Jun 25, 2021 at 2:48 PM Burcu Gül POLAT EĞRİ 
wrote:

> Dear All,
>
> we are using Flink 1.4.1 one of our projects. We send some image
> processing jobs to our processing nodes via Flink. Flink Task Managers are
> installed on each processing nodes. And our main application sends jobs to
> Flink Job Manager and Flink Job Manager sends jobs to Flink Task Manages
> according to availability. We implement a java application(let's say node
> application) and send this application jar to nodes while sending jobs.
> Flink executes this application. And this applications executes our
> processors running on processing nodes. This was working properly but some
> how we get a wierd error sometimes these day. We can not understan why. Our
> main application send lots of jobs to Job Manager and some times it
> responds HTTP 500 with below exception. But our node application continues
> to execution. When we receive HTTP 500 we send the job again and for this
> time Job Manager returns HTTP 200. We cannot understand why we received
> HTTP 500 and below exception. This error causes to generate same images and
> our customer doesn't want to generate images more than one.
>
> 09:45:49.614 WARN  [local-cluster-thread-2]
> t.c.s.m.w.n.a.e.FlinkJobExecutor.initializeJob:977 - [PROCESS_ID:
> WFM-ba350a80-1b5a-4ca4-869a-e3c9d3a0c32d]/Cannot instantiate job in FLINK
> in 1. trial; no job identifier is provided by Flink api, please check if
> system configuration is valid and Flink is running. Flink responds with
> http response is 500. Flink return response String:
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: Could not run the jar.
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
> ... 9 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> program caused an error:
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
> at
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
> ... 8 more
> Caused by:
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:54)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> at
> tr.com.sdt.mm.wfm.processor.api.agent.ProcessorInvokerAgent.main(ProcessorInvokerAgent.java:139)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> ... 11 more
>
> --
> BURCU
>


Re: Metric for JVM Overhaed

2021-06-25 Thread Guowei Ma
Hi Pranjul
There are already some system metrics that track the jvm
status(CPU/Memory/Threads/GC). You could find them in the [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#system-metrics
Best,
Guowei


On Fri, Jun 25, 2021 at 2:33 PM Pranjul Ahuja  wrote:

> Hi,
>
> Is there any metric to track the task manager JVM overhead? Or is it the
> case that it is already included in the metric Status.JVM.Memory.NonHeap?
>
> Thanks,
> Pranjul
>


Re: Flink State Processor API Example - Java

2021-06-24 Thread Guowei Ma
Hi Sandeep

What I understand is that you want to manipulate the state. So I think you
could use the old schema to read the state first, and then write it to a
new schema, instead of using a new schema to read an old schema format data.
In addition, I would like to ask, if you want to do "State Schema
Evolution" ? Flink currently supports avro+pojo's schema evolution[1], and
you don't need to do this manually.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution

Best,
Guowei


On Fri, Jun 25, 2021 at 3:04 AM Sandeep khanzode 
wrote:

> Hello,
>
> 1.] Can someone please share a working example of how to read
> ValueState and MapState from a checkpoint and update it? I
> tried to assemble a working Java example but there are bit and pieces of
> info around.
>
> 2.] I am using Avro 1.7.7 with Flink for state entities since versions
> belong Avro 1.7.7 add a serialVersionUid and then I cannot replace the
> class with a new Avro schema seamlessly. If I update the Avro schema, and
> the Avro Maven plugin runs, a new class with a new serialVersionUid is
> created and that cannot be replaced in the state with the Java exception
> stating that local and state copies are different.  Any example would be
> really appreciated.
>
> Thanks,
> Sandip


Re: Flink checkpoint periodically fail

2021-06-24 Thread Guowei Ma
Hi Qihua
It seems that the job fail because of checkpoint timeout(10min) from the
second picture. I found that the checkpoint fail is because one of your own
customs source could not acknowledge the cp.
So I think you could add some log in your source to figure out what is
happening at the moment.
Best,
Guowei


On Fri, Jun 25, 2021 at 6:21 AM Qihua Yang  wrote:

> Hi,
> We are using flink to consume data from kafka topics and push to elastic
> search cluster. We got an issue. checkpoint success 9 times and fail 2
> times. Those failures cause the job manager to restart. That pattern
> repeats every 20 ~ 25 minutes.
> The flink job has 72 subtasks. For every failed checkpoint, there are a
> few subtasks didn't acknowledge the checkpoint.
> Flink pod cpu usage and memory usage are pretty low.
> Elastic search node cpu and memory usage are also pretty low.
>
> Does anyone know why? And how to fix it?
> Attached are the graphs
>
> Thanks,
> Qihua
>


Re: Checkpoint loading failure

2021-06-16 Thread Guowei Ma
Hi Padarn
Will there be these errors if the jobgraph is not modified?
In addition, is this error stack all? Is it possible that other errors
caused the stream to be closed?
Best,
Guowei


On Tue, Jun 15, 2021 at 9:54 PM Padarn Wilson  wrote:

> Hi all,
>
> We have a job that has a medium size state (around 4GB) and after adding a
> new part of the job graph (which should not impact the job too much) we
> found that every single checkpoint restore has the following error:
>
> Caused by: java.io.IOException: s3a://: Stream is closed!
>> at
>> org.apache.hadoop.fs.s3a.S3AInputStream.checkNotClosed(S3AInputStream.java:472)
>> at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:347)
>> at java.io.FilterInputStream.read(FilterInputStream.java:83)
>> at
>> org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
>> at
>> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
>> at
>> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:42)
>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:781)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
>> at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
>> at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161)
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43)
>> at
>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
>> at
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
>> at
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
>> at
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
>> ... 17 more
>
>
> I haven't really got any clues on what this is caused by. You notice we
> are using the Hadoop file system, but switching to Presto is a bit tricky
> for us because of some of the bucket permissions that would need to change.
>
> Anyone have tips on debugging (or solving this)?
>


Re: Got multiple issues when running the tutorial project "table-walkthrough" on IDEA

2021-06-16 Thread Guowei Ma
Hi, Lingfeng

These job errors you posted happened when the job(`SpendReport`) was
running on the IDE?
According to my understanding, this document[1] & repository[2] mean that
the example is to be run in docker, not in IDE.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/table_api/
[2] https://github.com/apache/flink-playgrounds

Best,
Guowei


On Tue, Jun 15, 2021 at 10:29 AM Lingfeng Pu  wrote:

> Hi,
>
> I'm currently trying to debug the tutorial project "table-walkthrough" on
> IDEA on a standalone Flink environment. I installed the required software
> (Java 8 or 11, Maven, Docker) according to the tutorial. I'll provide some
> key environment info about the running environment before describing the
> issues as follows:
>
> 1. OS: Fedora 34;
> 2. Java version: 1.8;
> 3.Maven version: 3.6.3;
> 4. Docker version: 20.10.7;
> 5. Flink version: 1.13.1;
> 6. Scala version: 2.11;
> 7. No Kafka, Yarn, etc. are installed.
>
> *The issues are:*
> When I run the code on IDEA, it returns me WARNs and exceptions. Due to
> this tutorial project did not require things to be installed like Kafka,
> Yarn, I couldn't find any proper solution(s) for my problem after searching
> on the web. *The full issue report is shown below:*
>
>
> *=*
> /usr/java/jdk1.8.0_291-amd64/bin/java
> -javaagent:/var/lib/snapd/snap/intellij-idea-community/302/lib/idea_rt.jar=39087:/var/lib/snapd/snap/intellij-idea-community/302/bin
> -Dfile.encoding=UTF-8 -classpath
> 

Re: Discard checkpoint files through a single recursive call

2021-06-15 Thread Guowei Ma
hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how
you want to change it? For example, which interface or class do you want to
add a method to?
Although I am not a state expert, as far as I know, due to incremental
checkpoints, when CompleteCheckpoint is discarding, it is necessary to call
the discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang 
wrote:

> Hello Flink!
>
> We are building an infrastructure where we implement our own
> CompletedCheckpointStore. The read and write to the external storage
> location of these checkpoints are through HTTP calls to an external service.
>
> Recently we noticed some checkpoint file cleanup performance issue when
> the job writes out a very high number of checkpoint files per checkpoint.
> (In our case we had a few hundreds of operators and ran with 16
> parallelism)
> During checkpoint state discard phase, since the implementation in
> CompletedCheckpoint discards the state files one by one, we are seeing a
> very high number of remote calls. Sometimes the deletion fails to catch up
> with the checkpoint progress.
>
> Given the interface we are given to configure the external storage
> location for checkpoints is always a `target directory`. Would it be
> reasonable to expose an implementation of discard() that directly calls
> disposeStorageLocation with recursive set to true, without iterating over
> each individual files first? Is there any blockers for that?
>
> Thank you!
>
>
> links
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240
>
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70
>


Re: DataStream API in Batch Execution mode

2021-06-07 Thread Guowei Ma
Hi, Macro

I think you could try the `FileSource` and you could find an example from
[1]. The `FileSource` would scan the file under the given
directory recursively.
Would you mind opening an issue for lacking the document?

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
Best,
Guowei


On Tue, Jun 8, 2021 at 5:59 AM Marco Villalobos 
wrote:

> How do I use a hierarchical directory structure as a file source in S3
> when using the DataStream API in Batch Execution mode?
>
> I have been trying to find out if the API supports that, because currently
> our data is organized by years, halves, quarters, months, and but before I
> launch the job, I flatten the file structure just to process the right set
> of files.
>
>
>


Re: How do you debug a DataStream flat join on common window?

2021-05-24 Thread Guowei Ma
Hi,

Would you like to share your code?  It is very helpful to verify the
problem.
I think you could use the `JoinedStream.with().uid(xxx)` to set the
name/UID .

Best,
Guowei


On Mon, May 24, 2021 at 2:36 PM Marco Villalobos 
wrote:

> Hi,
>
> Stream one has one element.
> Stream two has 2 elements.
>
> Both streams derive from side-outputs. I am using the DataStream API in
> batch execution mode.
>
> I want to join them on a common key and window.
> I am certain the keys match, but the flat join does not seem to be working.
> I deduce that their windows are not matching.
> How can I debug which window they were assigned to?
> Also, how can I name or UID a joined stream?
>
> Any advice would be appreciated. Thank you.
>
>
>
>
>


Re: Task not serializable when logging in a trait method

2021-05-24 Thread Guowei Ma
Hi, Yik San

You need to change the following line:
  protected final val LOG = LoggerFactory.getLogger(getClass)
protected *static* final val LOG = LoggerFactory.getLogger(getClass)

Best,
Guowei


On Mon, May 24, 2021 at 2:41 PM Yik San Chan 
wrote:

> Hi community,
>
> I have a job that consumes data from a datagen source, tries to log
> something in `map` operators, and sinks the result to a DiscardingSink. The
> full example can be found in [the repo](
> https://github.com/YikSanChan/log-in-flink-operator).
>
> The `Job` extends `BaseJob` where `preprocess` and `process` methods are
> defined.
>
> BaseJob.scala
> ```scala
>
> import org.apache.flink.streaming.api.datastream.DataStreamSink
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink
> import org.apache.flink.streaming.api.scala.DataStream
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> import org.slf4j.LoggerFactory
>
> trait BaseJob {
>   protected final val LOG = LoggerFactory.getLogger(getClass)
>
>   def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]
>
>   def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {
> stream
>   .map { a =>
> // This LOG line throws Task not serializable
> // Commenting out the LOG line, then the LOG line in Job.scala works 
> just fine
> LOG.info("[BaseJob] a = " + a)
> a
>   }
>   .addSink(new DiscardingSink[AnyRef])
>   }
> }
>
> ```
>
> Job.scala
> ```scala
>
> import org.apache.flink.api.common.JobExecutionResult
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.EnvironmentSettings
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>
> object Job extends BaseJob {
>
>   private val CreateSource =
> """
>   |CREATE TABLE source (
>   |  a int
>   |) WITH (
>   |  'connector' = 'datagen',
>   |  'rows-per-second' = '5'
>   |)
>   |""".stripMargin
>
>   private def run(): JobExecutionResult = {
> val settings = EnvironmentSettings.newInstance.build
> val execEnv: StreamExecutionEnvironment =
>   StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(execEnv, settings)
> val stream = preprocess(tableEnv)
> process(stream)
> execEnv.execute("Streaming")
>   }
>
>   override def preprocess(tableEnv: StreamTableEnvironment): 
> DataStream[AnyRef] = {
> tableEnv.executeSql(CreateSource)
> val table = tableEnv.sqlQuery("SELECT a FROM source")
> tableEnv
>   .toDataStream(table)
>   .map {row =>
> val a = row.getField("a")
> // This LOG line works just fine!
> LOG.info("[Job] a = " + a)
> a
>   }
>   }
>
>   def main(args: Array[String]): Unit = {
> run()
>   }
> }
>
> ```
>
> It is very odd that, the LOG line in Job.scala works just fine, while the
> LOG line in BaseJob.scala complains that:
>
> ```
>
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>at 
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>at 
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)
>at 
> org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)
>at 
> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)
>at BaseJob$class.process(BaseJob.scala:15)
>at Job$.process(Job.scala:7)
>at Job$.run(Job.scala:25)
>at Job$.main(Job.scala:42)
>at Job.main(Job.scala)
> Caused by: java.io.NotSerializableException: Job$
>at 
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
>at 
> java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
>at 
> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
>at 
> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
>at 
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
>at 
> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
>at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
>... 10 more
>
> ```
>
> I wonder why, and how to resolve this issue, as I do want to LOG in the
> BaseJob? Thanks!
>
> Best,
> Yik San
>


Re: count the amount of data successfully processed by flink

2021-05-24 Thread Guowei Ma
Hi
I think you are right that the metrics are reset after the job restart. It
is because the metrics are only stored in the memory.
I think you could store the metrics to the Flink's state[1], which could be
restored after the job restarted.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/
Best,
Guowei


On Mon, May 24, 2021 at 10:59 AM zzzyw  wrote:

> Dear all:
>   I use flink for real-time data synchronization(mysql,oracle --> kafka -->
> mysql,oracle). I want to count how many pieces of data are synchronized
> every day(maybe need to count the last n days ).
>
>   I am doing this now: flink metrics send to pushgateway, and then sum the
> metrics (flink_taskmanager_job_task_numRecordsOut)  to count how many
> pieces
> of data are synchronized every day,
> but i found this metrics will reset after the flink job restart, how should
> I deal with this problem? or is there any better way to count?
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t3167/Snipaste_2021-05-24_10-51-22.png>
>
>
> Best regards
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-13 Thread Guowei Ma
Hi, Gary

I think it might be a bug. So would you like to open a jira for this.
And could you share the exception ,which the TaskManagerLocation is null?
It might be very helpful to verify the cause.

Best,
Guowei


On Thu, May 13, 2021 at 10:36 AM Yangze Guo  wrote:

> Hi, it seems to be related to FLINK-22276. Thus, I'd involve Matthias
> to take a look.
>
> @Matthias My gut feeling is that not all execution who has failureInfo
> has been deployed?
>
> Best,
> Yangze Guo
>
> On Wed, May 12, 2021 at 10:12 PM Gary Wu  wrote:
> >
> > Hi,
> >
> > We have upgraded our Flink applications to 1.13.0 but we found that Root
> Exception can not be shown on Web UI with an internal server error message.
> After opening browser development console and trace the message, we found
> that there is a exception in jobmanager:
> >
> > 2021-05-12 13:30:45,589 ERROR
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
> Unhandled exception.
> > java.lang.IllegalArgumentException: The location must not be null for a
> non-global failure.
> > at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> ~[?:?]
> > at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]
> > at
> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632)
> ~[?:?]
> > at
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
> ~[?:?]
> > at
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
> ~[?:?]
> > at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) ~[?:?]
> > at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
> > at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> ~[?:?]
> > at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
> > at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> ~[?:?]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> > at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> [?:?]
> > at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> [?:?]
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> [?:?]
> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> > at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> [?:?]
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> > at java.lang.Thread.run(Thread.java:834) [?:?]
> >
> > We would like to check Is there any configuration change should be done
> for the application? Thanks!
> >
> > Regards,
> > -Gary
> >
> >
> >
> > APPIER EMAIL NOTICE
> >
> > The contents of this email message and any attachments from Appier Group
> Inc. and/or its affiliates may be privileged and confidential. If you are
> not the intended recipient of this email, please note that any disclosure,
> copying, distribution, or use of this message or its attachments is
> prohibited. If you have received this email in error, please contact us
> immediately and delete this message and any attachments.
>


Re: How does JobManager terminate dangling task manager

2021-05-13 Thread Guowei Ma
Hi,
In fact, not only JobManager(ResoruceManager) will kill TimeOut's
TaskManager, but if TaskManager finds that it cannot connect to
JobManager(ResourceManager), it will also exit by itself.
You can look at the time period during which the HB timeout occurred and
what happened in the log. Under normal circumstances, I also look at what
the GC situation was like at that time.
Best,
Guowei


On Thu, May 13, 2021 at 11:06 AM narasimha  wrote:

> Hi,
>
> Trying to understand how JobManager. kills TaskManager that didn't respond
> for heartbeat after a certain time.
>
> For example:
>
> If a network connection b/w JobManager and TaskManager is lost for some
> reasons, the JobManager will bring up another Taskmanager post
> hearbeat timeout.
> In such a case, how does JobManager make sure all connections like to
> Kafka from lost Taskmanager are cut down and the new one will take from a
> certain consistent point.
>
> Also want to learn ways to debug what caused the timeout, our job fairly
> handles 5k records/s, not a heavy traffic job.
> --
> A.Narasimha Swamy
>


Re: Flink sql task failure recovery does not work.

2021-05-13 Thread Guowei Ma
Hi
I think you could configure some restart strategy[1] likes
restart-strategy: fixed-delay

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy
Best,
Guowei


On Thu, May 13, 2021 at 12:02 PM 1095193...@qq.com <1095193...@qq.com>
wrote:

> Hi team,
>Following Task Failure Recovery document
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/
>  ,  I have  enabled state.checkpoints.dir parameter in flink-conf.yaml.
>
> state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints
> state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints
>
> However, Flink sql Task still throw Exception 
> org.apache.flink.runtime.JobException:
> Recovery is suppressed by NoRestartBackoffTimeStrategy.  Apparently, no
> restart strategy enabled. Do we have enabled other configuration rather
> than state.checkpoints.dir.  Thanks for any suggestions.
>
> --
> 1095193...@qq.com
>


Re: Need Clarity about Checkpoint for Flink-1.12.2

2021-05-13 Thread Guowei Ma
Hi Sudhansu,
I think you do not need to set the config in flink-conf.
Best,
Guowei


On Thu, May 13, 2021 at 1:06 PM sudhansu jena 
wrote:

> Hi Team,
>
> We have recently enabled Check Pointing in our flink job using
> FSStateBackend pointing to S3 bucket.
>
> Below is the sample code for enabling check pointing though app code and
> we are using flink version - 1.12.2 .
>
> env.setStateBackend(new
> FsStateBackend("s3://flinkcheckpointing/job-name/",true));
> env.enableCheckpointing(1000);
> Class unmodColl =
> Class.forName("java.util.Collections$UnmodifiableCollection");
> env.getConfig().addDefaultKryoSerializer(unmodColl,
> UnmodifiableCollectionsSerializer.class);
> CheckpointConfig config = env.getCheckpointConfig();
>
> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>
> The query is , do we still need to set the below config in flink-conf.yaml
> for checkpointing to work.
>
> *state.checkpoints.dir: s3://prod-flink-checkpointing/checkpoint-metadata/*
>
>
> Thanks,
> Sudhansu
>


Re: docker based taskmanager can't connect to job/resource manager

2021-05-12 Thread Guowei Ma
Hi,

Here is the link[1].

[image: image.png]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#starting-a-session-cluster-on-docker

Best,
Guowei


On Thu, May 13, 2021 at 1:53 PM guenterh.lists 
wrote:

> Hi Guowei,
>
> thanks for your reply! This information was still missing. The presenter
> mentioned the documentation but I hadn't found it. So your link to the
> specific place is valuable too.
>
> Günter
> On 13.05.21 06:09, Guowei Ma wrote:
>
> Hi,
> I do not try it. But from the documentation[1] it seems that you might
> need add the "jobmanager.rpc.address: jobmanager" to the FLINK_PROPERTIES
> before creating a network.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/
>
> Best,
> Guowei
>
>
> On Thu, May 13, 2021 at 3:56 AM guenterh.lists 
> wrote:
>
>> Hi,
>>
>> I'm trying to start a mini cluster following the explanations given in a
>> flink forward presentation [1]
>>
>> Starting a jobmanager task is possible
>>
>> FLINK_PROPERTIES="jobmanager.memory.process.size: 2048m
>> parallelism.default: 4
>> "
>> docker network create flink-network
>>
>> docker run  \
>> --rm   \
>> --name=jobmanager  \
>> --network flink-network \
>> -p 8081:8081  \
>> --env FLINK_PROPERTIES="${FLINK_PROPERTIES}"  \
>> flink:1.13.0-scala_2.12-java11 jobmanager
>>
>>
>> Unfortunately the taskmanager process can't connect
>>
>> docker run  \
>> --rm   \
>> --name=taskmanager  \
>> --network flink-network \
>> --env FLINK_PROPERTIES="${FLINK_PROPERTIES}"  \
>> flink:1.13.0-scala_2.12-java11 taskmanager
>>
>> 2021-05-12 19:43:11,396 INFO
>> org.apache.flink.runtime.net.ConnectionUtils [] - Failed
>> to connect from address '/172.20.0.3': Connection refused (Connection
>> refused)
>> 2021-05-12 19:44:26,082 WARN
>> akka.remote.transport.netty.NettyTransport   [] - Remote
>> connection to [null] failed with java.net.ConnectException: Connection
>> refused: 5e8efb79f191/172.20.0.3:6123
>> 2021-05-12 19:44:26,084 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
>> not resolve ResourceManager address
>> akka.tcp://flink@5e8efb79f191:6123/user/rpc/resourcemanager_*, retrying
>> in 1 ms: Could not connect to rpc endpoint under address
>> akka.tcp://flink@5e8efb79f191:6123/user/rpc/resourcemanager_*.
>> 2021-05-12 19:44:26,084 WARN
>> akka.remote.ReliableDeliverySupervisor   [] -
>> Association with remote system [akka.tcp://flink@5e8efb79f191:6123] has
>> failed, address is now gated for [50] ms. Reason: [Association failed
>> with [akka.tcp://flink@5e8efb79f191:6123]] Caused by:
>> [java.net.ConnectException: Connection refused:
>> 5e8efb79f191/172.20.0.3:6123]
>>
>> and the dashboard (of the jobmanager task) doesn't show the taskmanager
>> process (as I would expect)
>>
>> Any hints? - Thanks!
>>
>> Günter
>>
>>
>> [1]
>>
>> https://www.youtube.com/watch?v=VVh6ikd-l9s=PLDX4T_cnKjD054YExbUOkr_xdYknVPQUm=45
>> "Flink's New Dockerfile: One File to Rule Them All"
>>
>>


Re: docker based taskmanager can't connect to job/resource manager

2021-05-12 Thread Guowei Ma
Hi,
I do not try it. But from the documentation[1] it seems that you might need
add the "jobmanager.rpc.address: jobmanager" to the FLINK_PROPERTIES before
creating a network.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/

Best,
Guowei


On Thu, May 13, 2021 at 3:56 AM guenterh.lists 
wrote:

> Hi,
>
> I'm trying to start a mini cluster following the explanations given in a
> flink forward presentation [1]
>
> Starting a jobmanager task is possible
>
> FLINK_PROPERTIES="jobmanager.memory.process.size: 2048m
> parallelism.default: 4
> "
> docker network create flink-network
>
> docker run  \
> --rm   \
> --name=jobmanager  \
> --network flink-network \
> -p 8081:8081  \
> --env FLINK_PROPERTIES="${FLINK_PROPERTIES}"  \
> flink:1.13.0-scala_2.12-java11 jobmanager
>
>
> Unfortunately the taskmanager process can't connect
>
> docker run  \
> --rm   \
> --name=taskmanager  \
> --network flink-network \
> --env FLINK_PROPERTIES="${FLINK_PROPERTIES}"  \
> flink:1.13.0-scala_2.12-java11 taskmanager
>
> 2021-05-12 19:43:11,396 INFO
> org.apache.flink.runtime.net.ConnectionUtils [] - Failed
> to connect from address '/172.20.0.3': Connection refused (Connection
> refused)
> 2021-05-12 19:44:26,082 WARN
> akka.remote.transport.netty.NettyTransport   [] - Remote
> connection to [null] failed with java.net.ConnectException: Connection
> refused: 5e8efb79f191/172.20.0.3:6123
> 2021-05-12 19:44:26,084 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
> not resolve ResourceManager address
> akka.tcp://flink@5e8efb79f191:6123/user/rpc/resourcemanager_*, retrying
> in 1 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@5e8efb79f191:6123/user/rpc/resourcemanager_*.
> 2021-05-12 19:44:26,084 WARN
> akka.remote.ReliableDeliverySupervisor   [] -
> Association with remote system [akka.tcp://flink@5e8efb79f191:6123] has
> failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@5e8efb79f191:6123]] Caused by:
> [java.net.ConnectException: Connection refused:
> 5e8efb79f191/172.20.0.3:6123]
>
> and the dashboard (of the jobmanager task) doesn't show the taskmanager
> process (as I would expect)
>
> Any hints? - Thanks!
>
> Günter
>
>
> [1]
>
> https://www.youtube.com/watch?v=VVh6ikd-l9s=PLDX4T_cnKjD054YExbUOkr_xdYknVPQUm=45
> "Flink's New Dockerfile: One File to Rule Them All"
>
>


Re: How to config the flink to load libs in myself path

2021-04-19 Thread Guowei Ma
Hi, chenxuying
There is currently no official support for this.
What I am curious about is why you have this requirement. In theory, you
can always build your own image.
Best,
Guowei


On Mon, Apr 19, 2021 at 9:58 PM chenxuying  wrote:

> Hi all, I deployed the flink in K8S by session cluster [1]
> the default plugin path is /opt/flink/plugins,
> the default lib path is /opt/flink/lib,
> the default usrlib path is /opt/flink/usrlib,
> I wonder if it is possible for change the default path.
> For example, I wish flink don't load libs from /opt/flink/lib , and my want
> it to load libs files from /data/flink/lib.  and I can't move
> /data/flink/lib to /opt/flink/lib
> So how to config the flink to load lib in myself path
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#session-cluster-resource-definitions
>
>
>
>
>


Re: Max-parellelism limitation

2021-04-19 Thread Guowei Ma
Hi, Olivier
Yes. The introduction of this concept is to solve the problem of rescaling
the keystate.
Best,
Guowei


On Mon, Apr 19, 2021 at 8:56 PM Olivier Nouguier 
wrote:

> Hi,
>   May I have the confirmation that the max-parallelism limitation only
> occurs when keyed states are used ?
>
>
> --
>
> Olivier Nouguier
>
> SSE
>
> e | olivier.nougu...@teads.com m | 0651383971
>
> Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
> Montpellier, France
> [image: image] 
>
> The information in this email is confidential and intended only for the
> addressee(s) named above. If you are not the intended recipient any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it is prohibited and may be unlawful. Teads does not
> warrant that any attachment(s) are free from viruses or other defects and
> accept no liability for any losses resulting from infected email
> transmission. Please note that any views expressed in this email may be
> those of the originator and do not necessarily reflect those of the
> organization.
>


Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Guowei Ma
Hi, Sumeet
Thanks you for the sharing. As Dian suggested, I think you could use b as
your `group_by`'s key and so the b could be output directly.
I think it is more simple.
Best,
Guowei


On Mon, Apr 19, 2021 at 7:31 PM Dian Fu  wrote:

> Hi Sumeet,
>
> Thanks for the sharing.
>
> Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since
> the value for input.a is always the same, it’s equal to group_by(col(‘w')
> , input.b) logically. The benefit is that you could access input.a
> directly in the select clause.
>
> Regards,
> Dian
>
> 2021年4月19日 下午6:29,Sumeet Malhotra  写道:
>
> Hi Guowei,
>
> Let me elaborate the use case with an example.
>
> Sample input table looks like this:
>
> timea   b   c
> -
> t0  a0  b0  1
> t1  a0  b1  2
> t2  a0  b2  3
> t3  a0  b0  6
> t4  a0  b1  7
> t5  a0  b2  8
>
> Basically, every time interval there are new readings from a fixed set of
> sensors (b0, b1 and b2). All these rows have a few constant fields
> representing metadata about the input (a0).
>
> Desired output for every time interval is the average reading for every
> sensor (b0, b1, b2), along with the constant metadata (a0):
>
> a0b0avg(c)
> a0b1avg(c)
> a0b2avg(c)
>
> This is what I was trying to build using a simple Tumble window:
>
> input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w"))
> \
> .group_by(col('w'), input.b) \
> .select(
> input.a,<=== constant metadata field,
> same for every input record
> input.b,<=== group_by field, to
> compute averages
> input.c.avg.alias('avg_value')) \
> .execute_insert('MySink') \
> .wait()
>
> The example above is highly simplified, but I hope it explains what I'm
> trying to achieve.
>
> Thanks,
> Sumeet
>
>
> On Mon, Apr 19, 2021 at 3:21 PM Dian Fu  wrote:
>
>> Hi Sumeet,
>>
>> 1) Regarding to the above exception, it’s a known issue and has been
>> fixed in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> 
>> [1]. It
>> will be available in the coming 1.12.3. You could also cherry-pick that fix
>> to 1.12.2 and build from source following the instruction described in [2]
>> if needed.
>>
>> 2) Regarding to your requirements, could you describe what you want to do
>> with group window or over window?
>> For group window(e.g. tumble window, hop window, session window, etc), it
>> will output one row for multiple inputs belonging to the same window. You
>> could not just passing through it from input to sink as it is
>> non-determinitic which row to use as there are multiple input rows. That’s
>> the reason why you have to declare a field in the group by clause if you
>> want to access it directly in the select clause. For over window, it will
>> output one row for each input and so you could pass through it directly.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21922.
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>>
>>
>> 2021年4月19日 下午5:16,Sumeet Malhotra  写道:
>>
>> Thanks Guowei. I'm trying out Over Windows, as follows:
>>
>> input \
>> .over_window(
>> Over.partition_by(col(input.a)) \
>> .order_by(input.Timestamp) \
>> .preceding(lit(10).seconds) \
>> .alias('w')) \
>> .select(
>> input.b,
>> input.c.avg.over(col('w'))) \
>> .execute_insert('MySink') \
>> .wait()
>>
>> But running into following exception:
>>
>> py4j.protocol.Py4JError: An error occurred while calling
>> z:org.apache.flink.table.api.Over.partitionBy. Trace:
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
>>
>> Is there any extra Jar that needs to be included for Over Windows. From
>> the code it doesn't appear so.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma  wrote:
>>
>>> Hi, Sumeet
>>>
>>> Maybe you could try the Over Windows[1], which could keep the
>>> "non-group-key" column.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Apr 19, 2021 at 3:25 PM Sumeet 

Re: Batch Task Synchronization

2021-04-19 Thread Guowei Ma
Hi, Mary
 Flink has an alignment mechanism for synchronization. All upstream
taks (for example reduce1) will send a message after the end of a round
 to inform all downstream that he has processed all the data. When the
downstream (reduce2) collected all the messages from all his upstream
tasks,
 it(reduce2) knew that all the data was collected. After that,
it(reduce2) could process all its inputs.
 Hope it helps you.
Best,
Guowei


On Mon, Apr 19, 2021 at 5:17 PM Maria Xekalaki <
maria.xekal...@manchester.ac.uk> wrote:

> Hi All,
>
> This is more of a general question. How are tasks synchronized in batch
> execution? If, for example, we ran an iterative pipeline (map1 -> reduce1
> -> reduce2 -> map2), and the first two operators (map1->reduce1) were
> chained, how would reduce2 be notified that map1 -> reduce1 have completed
> their execution so as to start reading its input data? I noticed that in
> the driver classes (MapDriver, ChainedReduceDriver etc.) there are input
> and output counters (numRecordsOut, numRecordsIn). Are these used to check
> if an operator has consumed all of its data?
>
> Thank you in advance.
>
> Best Wishes,
> Mary
>


Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Guowei Ma
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the
"non-group-key" column.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows

Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra 
wrote:

> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause
> any issues. It's only when I want to use "input.b".
>
> My use case is to basically emit "input.b" in the final sink as is, and
> not really perform any aggregation on that column - more like pass through
> from input to sink. What's the best way to achieve this? I was thinking
> that making it part of the select() clause would do it, but as you said
> there needs to be some aggregation performed on it.
>
> Thanks,
> Sumeet
>
>
> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma  wrote:
>
>> Hi, Sumeet
>>   For "input.b" I think you should aggregate the non-group-key
>> column[1].
>> But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved
>> errors. Would you mind giving more detailed error information?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>>
>> Best,
>> Guowei
>>
>>
>> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <
>> sumeet.malho...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a use case where I'm creating a Tumbling window as follows:
>>>
>>> "input" table has columns [Timestamp, a, b, c]
>>>
>>> input \
>>> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w"))
>>> \
>>> .group_by(col('w'), input.a) \
>>> .select(
>>> col('w').start.alias('window_start'),
>>> col('w').end.alias('window_end'),
>>> input.b,
>>> input.c.avg.alias('avg_value')) \
>>> .execute_insert('MySink') \
>>> .wait()
>>>
>>> This throws an exception that it cannot resolve the fields "b" and "c"
>>> inside the select statement. If I mention these column names inside the
>>> group_by() statement as follows:
>>>
>>> .group_by(col('w'), input.a, input.b, input.c)
>>>
>>> then the column names in the subsequent select statement can be resolved.
>>>
>>> Basically, unless the column name is explicitly made part of the
>>> group_by() clause, the subsequent select() clause doesn't resolve it. This
>>> is very similar to the example from Flink's documentation here [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
>>> where a similar procedure works.
>>>
>>> Any idea how I can access columns from the input stream, without having
>>> to mention them in the group_by() clause? I really don't want to group the
>>> results by those fields, but they need to be written to the sink eventually.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>


Re: Accessing columns from input stream table during Window operations

2021-04-19 Thread Guowei Ma
Hi, Sumeet
  For "input.b" I think you should aggregate the non-group-key
column[1].
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved
errors. Would you mind giving more detailed error information?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows

Best,
Guowei


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra 
wrote:

> Hi,
>
> I have a use case where I'm creating a Tumbling window as follows:
>
> "input" table has columns [Timestamp, a, b, c]
>
> input \
> .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
> .group_by(col('w'), input.a) \
> .select(
> col('w').start.alias('window_start'),
> col('w').end.alias('window_end'),
> input.b,
> input.c.avg.alias('avg_value')) \
> .execute_insert('MySink') \
> .wait()
>
> This throws an exception that it cannot resolve the fields "b" and "c"
> inside the select statement. If I mention these column names inside the
> group_by() statement as follows:
>
> .group_by(col('w'), input.a, input.b, input.c)
>
> then the column names in the subsequent select statement can be resolved.
>
> Basically, unless the column name is explicitly made part of the
> group_by() clause, the subsequent select() clause doesn't resolve it. This
> is very similar to the example from Flink's documentation here [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples,
> where a similar procedure works.
>
> Any idea how I can access columns from the input stream, without having to
> mention them in the group_by() clause? I really don't want to group the
> results by those fields, but they need to be written to the sink eventually.
>
> Thanks,
> Sumeet
>


Re: [DISCUSS] Feature freeze date for 1.13

2021-04-01 Thread Guowei Ma
Hi, Yuval

Thanks for your contribution. I am not a SQL expert, but it seems to be
beneficial to users, and the amount of code is not much and only left is
the test. Therefore, I am open to this entry into rc1.
But according to the rules, you still have to see if there are other PMC's
objections within 48 hours.

Best,
Guowei


On Thu, Apr 1, 2021 at 10:33 PM Yuval Itzchakov  wrote:

> Hi All,
>
> I would really love to merge https://github.com/apache/flink/pull/15307
> prior to 1.13 release cutoff, it just needs some more tests which I can
> hopefully get to today / tomorrow morning.
>
> This is a critical fix as now predicate pushdown won't work for any stream
> which generates a watermark and wants to push down predicates.
>
> On Thu, Apr 1, 2021, 10:56 Kurt Young  wrote:
>
>> Thanks Dawid, I have merged FLINK-20320.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi all,
>>>
>>> @Kurt @Arvid I think it's fine to merge those two, as they are pretty
>>> much finished. We can wait for those two before creating the RC0.
>>>
>>> @Leonard Personally I'd be ok with 3 more days for that single PR. I
>>> find the request reasonable and I second that it's better to have a proper
>>> review rather than rush unfinished feature and try to fix it later.
>>> Moreover it got broader support. Unless somebody else objects, I think we
>>> can merge this PR later and include it in RC1.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 01/04/2021 08:39, Arvid Heise wrote:
>>>
>>> Hi Dawid and Guowei,
>>>
>>> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are
>>> pretty much just waiting for AZP to turn green, it's separate from other
>>> components, and it's a super useful feature for Flink users.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> [1] https://github.com/apache/flink/pull/15054
>>>
>>> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>>>
>>>> Hi Guowei and Dawid,
>>>>
>>>> I want to request the permission to merge this feature [1], it's a
>>>> useful improvement to sql client and won't affect
>>>> other components too much. We were plan to merge it yesterday but met
>>>> some tricky multi-process issue which
>>>> has a very high possibility hanging the tests. It took us a while to
>>>> find out the root cause and fix it.
>>>>
>>>> Since it's not too far away from feature freeze and RC0 also not
>>>> created yet, thus I would like to include this
>>>> in 1.13.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:
>>>>
>>>>> Hi, community:
>>>>>
>>>>> Friendly reminder that today (3.31) is the last day of feature
>>>>> development. Under normal circumstances, you will not be able to submit 
>>>>> new
>>>>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>>>>> testing, welcome to help test together.
>>>>> After the test is relatively stable, we will cut the release-1.13
>>>>> branch.
>>>>>
>>>>> Best,
>>>>> Dawid & Guowei
>>>>>
>>>>>
>>>>> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
>>>>> wrote:
>>>>>
>>>>>> +1 for the 31st of March for the feature freeze.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>>>>>> wrote:
>>>>>>
>>>>>> > +1 for March 31st for the feature freeze.
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>>>>>> dwysakow...@apache.org>
>>>>>> > wrote:
>>>>>> >
>>>>>> > > Thank you Thomas! I'll definitely check the issue you linked.
>>>>>> > >
>>>>>> > > Best,
>>>>>> > >
>>>>>> > > Dawid
>>>>>> > >
>>>>>> > >

Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Guowei Ma
Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage
fails. On the other hand the FileSystem already has this retry mechanism
already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is
interrupted by some reason. So I think we could get more insight if we
could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi Guowei,
>
> I thought Flink can support any HDFS-compatible object store like the
> majority of Big Data frameworks. So we just added
> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
> dependencies to the classpath, after that using "gs" prefix seems to be
> possible:
>
> state.checkpoints.dir: gs:///flink-checkpoints
> state.savepoints.dir: gs:///flink-savepoints
>
> And yes, I noticed that retries logging too, but I'm not sure if it's
> implemented on the Flink side or the GCS connector side? Probably need to
> dive deeper into the source code. And if it's implemented on the GCS
> connector side, will Flink wait for all the retries? That's why I asked
> about the potential timeout on the Flink side.
>
> The JM log doesn't have much besides from what I already posted. It's hard
> for me to share the whole log, but the RocksDB initialization part can be
> relevant:
>
> 16:03:41.987 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
> configure application-defined state backend:
> RocksDBStateBackend{checkpointStreamBackend=File State Backend
> (checkpoints: 'gs:///flink-checkpoints', savepoints:
> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> predefined options: FLASH_SSD_OPTIMIZED.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
> state.backend.rocksdb.block.blocksize=16 kb,
> state.backend.rocksdb.block.cache-size=64 mb}}.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
> Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
>
> Thanks!
>
> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:
>
>> Hi, Yaroslav
>>
>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
>> is implemented by yourself?
>> Would you like to share the whole log of jm?
>>
>> BTW: From the following log I think the implementation has already some
>> retry mechanism.
>> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries
>> for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>> yaroslav.tkache...@shopify.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I'm wondering if people have experienced issues with Taskmanager failure
>>> recovery when dealing with a lot of state.
>>>
>>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
>>> and checkpoints. ~150 task managers with 4 slots each.
>>>
>>> When I run a pipeline without much state and kill one of the
>>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
>>> eventually when a new replacement taskmanager is registered with the
>>> jobmanager things go back to healthy.
>>>
>>> But when I run a pipeline with a lot of state (1TB+) and kill one of the
>>> taskmanagers, the pipeline never recovers, even after the replacement
>>> taskmanager has joined. It just enters an infinite loop of restarts and
>>> failures.
>>>
>>> On the jobmanager, I see an endless loop of state transitions: RUNNING
>>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
>>> It stays in RUNNING for a few seconds, but th

Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-04-01 Thread Guowei Ma
 
> org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
> Caused by: org.apache.flink.util.SerializedThrowable: The AWS Access Key Id 
> you provided does not exist in our records. (Service: Amazon S3; Status Code: 
> 403; Error Code: InvalidAccessKeyId; Request ID: RMD85E1G3WAK18VE; S3 
> Extended Request ID: 
> VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=; 
> Proxy: null)
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
>  ~[?:?]
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
>  ~[?:?]
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
>  ~[?:?]
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
>  ~[?:?]
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
>  ~[?:?
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
>  ~[?:?]
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
>  ~[?:?]
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
>  ~[?:?]
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
>  ~[?:?]
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) 
> ~[?:?]
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) 
> ~[?:?]
> at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062) 
> ~[?:?]
> at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008) 
> ~?:?]
> at 
> com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3581)
>  ~[?:?]
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>  ~[?:?]
> at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
>  ~[?:?]
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ~[?:?]
> ... 35 more
>
>
> On Wed, Mar 31, 2021 at 8:29 PM Guowei Ma  wrote:
>
>> Hi, Robert
>> I think you could try to change the "s3://argo-artifacts/" to "
>> s3a://argo-artifacts/".
>> It is because that currently `StreamingFileSink` only supports Hadoop
>> based s3 but not Presto based s3. [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen 
>> wrote:
>>
>>> I’m using a local instance of MINIO on my kub

Re: Checkpoint timeouts at times of high load

2021-04-01 Thread Guowei Ma
Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For
example, the detailed checkpoint information from the web.[1]  And which
Flink version do you use?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Hi Community,
>
>
> I have a number of flink jobs running inside my session cluster with
> varying checkpoint intervals plus a large amount of operator state and in
> times of high load, the jobs fail due to checkpoint timeouts (set to 6
> minutes). I can only assume this is because the latencies for saving
> checkpoints at these times of high load increase. I have a 30 node HDFS
> cluster for checkpoints... however I see that only 4 of these nodes are
> being used for storage. Is there a way of ensuring the load is evenly
> spread? Could there be another reason for these checkpoint timeouts? Events
> are being consumed from kafka, to kafka with EXACTLY ONCE guarantees
> enabled.
>
>
> Thank you very much!
>
>
> M.
>


Re: Flink Taskmanager failure recovery and large state

2021-04-01 Thread Guowei Ma
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
is implemented by yourself?
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some
retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries
for 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d

Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi everyone,
>
> I'm wondering if people have experienced issues with Taskmanager failure
> recovery when dealing with a lot of state.
>
> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
> and checkpoints. ~150 task managers with 4 slots each.
>
> When I run a pipeline without much state and kill one of the
> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
> eventually when a new replacement taskmanager is registered with the
> jobmanager things go back to healthy.
>
> But when I run a pipeline with a lot of state (1TB+) and kill one of the
> taskmanagers, the pipeline never recovers, even after the replacement
> taskmanager has joined. It just enters an infinite loop of restarts and
> failures.
>
> On the jobmanager, I see an endless loop of state transitions: RUNNING
> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
> It stays in RUNNING for a few seconds, but then transitions into FAILED
> with a message like this:
>
>
> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph - 
> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> readAddress(..) failed: Connection reset by peer (connection to '
> 10.30.10.53/10.30.10.53:45789')
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ...
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> readAddress(..) failed: Connection reset by peer
>
>
> Which, I guess, means a failed Taskmanager. And since there are not enough
> task slots to run it goes into this endless loop again. It's never the same
> Taskmanager that fails.
>
>
>
> On the Taskmanager side, things look more interesting. I see a variety of
> exceptions:
>
>
> org.apache.flink.runtime.taskmanager.Task -  (141/624)#7
> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
> attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.
>
>
> also
>
>
> WARNING: Failed read retry #1/10 for
> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
> Sleeping...
> java.nio.channels.ClosedByInterruptException
> at
> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
> Source)
> at
> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
> Source)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
> at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
> at java.base/java.io.DataInputStream.read(Unknown Source)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> at java.base/java.io.InputStream.read(Unknown Source)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
> ...
>
>
> and
>
>
> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
> retries for
> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
> 20:52:46.894 [ (141/624)#7] ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
> Caught unexpected exception.
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>
>
> also
>
>
> 20:52:46.895 [ (141/624)#7] WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
> Exception while restoring keyed state backend for
> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from

Re: ARM support

2021-04-01 Thread Guowei Ma
Hi, Rex

I think that Flink does not have an official release that supports the arm
architecture. There are some efforts and discussion [1][2][3] about
supporting the architecture. I think you could find some builds at
openlabtesting. [4]
But AFAIK there is no clear timeline about that.(correct me if I miss
something) There is a discussion [5] and I think you might find some
insight from there at that time.

[1] https://issues.apache.org/jira/browse/FLINK-13448
[2]
https://lists.apache.org/thread.html/a564836a3c7cc5300bec7729c2af1ad9d611d526bb59dd6cca72cc7b%40%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/2399c8a701bced2266f9658719807b98a2e593a99b949f50e9a1ab1a%40%3Cdev.flink.apache.org%3E
[4] http://status.openlabtesting.org/builds?project=apache%2Fflink
[5]
https://lists.apache.org/thread.html/5c4c75a2de979ed7ef1c661c15dd252569e598a374c27042b38d078b%40%3Cdev.flink.apache.org%3E

Best,
Guowei


On Thu, Apr 1, 2021 at 3:55 AM Rex Fenley  wrote:

> Hello,
>
> We would like to run Flink on ARM yet haven't found any resources
> indicating that this is yet possible. We are wondering what the timeline is
> for Flink supporting ARM. Given that all Mac Books are moving to ARM and
> that AWS is excitedly supporting ARM, it seems important that Flink also
> supports running on ARM.
>
> Thank you
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Guowei Ma
Hi, Sumeet

I am not an expert about PyFlink. But I think @Dian Fu
  might give some insight about this problem.

Best,
Guowei


On Thu, Apr 1, 2021 at 12:12 AM Sumeet Malhotra 
wrote:

> Cross posting from StackOverlow here:
>
>
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
>
> Any pointers are appreciated!
>
> Thanks,
> Sumeet
>


Re: Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Guowei Ma
Hi, Kevin

If you use the RocksDB and want to know the data on the disk I think that
is the right metric. But the SST files might include some expired data.
Some data in memory is not included in the SST files yet. In general I
think it could reflect the state size of your application.

I think that there is no metric for the time that spends on restoring from
a savepoint.

As for why there is a huge difference between the size of sst and the size
of savepoint, I think @Yun can give some detailed insights.

Best,
Guowei


On Thu, Apr 1, 2021 at 1:38 AM Kevin Lam  wrote:

> Hi all,
>
> We're interested in doing some analysis on how the size of our savepoints
> and state affects the time it takes to restore from a savepoint. We're
> running Flink 1.12 and using RocksDB as a state backend, on Kubernetes.
>
> What is the best way to measure the size of a Flink Application's state?
> Is state.backend.rocksdb.metrics.total-sst-files-size
> 
> the right thing to look at?
>
> We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for
> all our operators, after restoring from a savepoint, and we noticed that
> the sum of all the sst files sizes is much much smaller than the total size
> of our savepoint (7GB vs 10TB).  Where does that discrepancy come from?
>
> Do you have any general advice on correlating savepoint size with restore
> times?
>
> Thanks in advance!
>


Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-03-31 Thread Guowei Ma
Hi, Robert
I think you could try to change the "s3://argo-artifacts/" to "
s3a://argo-artifacts/".
It is because that currently `StreamingFileSink` only supports Hadoop based
s3 but not Presto based s3. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations

Best,
Guowei


On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen  wrote:

> I’m using a local instance of MINIO on my kubernetes cluster for
> checkpoint/savepoint storage. I’m using this StreamingFileSync
> configuration:
>
>
> final StreamingFileSink> sink =
> StreamingFileSink.forRowFormat(
> new Path("s3://argo-artifacts/"),
> new SimpleStringEncoder Long>>("UTF-8"))
> .withBucketAssigner(new KeyBucketAssigner())
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .withOutputFileConfig(config)
> .build();
>
> Anyone know how to fix this exception?
>
> java.lang.UnsupportedOperationException: This s3 file system implementation 
> does not support recoverable writers.
> at 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
>  ~[?:?]
> at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>
> --
> Robert Cullen
> 240-475-4490
>


Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Guowei Ma
Hi, community:

Friendly reminder that today (3.31) is the last day of feature development.
Under normal circumstances, you will not be able to submit new features
from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for testing,
welcome to help test together.
After the test is relatively stable, we will cut the release-1.13 branch.

Best,
Dawid & Guowei


On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann  wrote:

> +1 for the 31st of March for the feature freeze.
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
> wrote:
>
> > +1 for March 31st for the feature freeze.
> >
> >
> >
> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz  >
> > wrote:
> >
> > > Thank you Thomas! I'll definitely check the issue you linked.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 23/03/2021 20:35, Thomas Weise wrote:
> > > > Hi Dawid,
> > > >
> > > > Thanks for the heads up.
> > > >
> > > > Regarding the "Rebase and merge" button. I find that merge option
> > useful,
> > > > especially for small simple changes and for backports. The following
> > > should
> > > > help to safeguard from the issue encountered previously:
> > > > https://github.com/jazzband/pip-tools/issues/1085
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
> > dwysakow...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Hi devs, users!
> > > >>
> > > >> 1. *Feature freeze date*
> > > >>
> > > >> We are approaching the end of March which we agreed would be the
> time
> > > for
> > > >> a Feature Freeze. From the knowledge I've gather so far it still
> seems
> > > to
> > > >> be a viable plan. I think it is a good time to agree on a particular
> > > date,
> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
> > > >> (Wednesday next week) as the feature freeze time.
> > > >>
> > > >> Similarly as last time, we want to create RC0 on the day after the
> > > feature
> > > >> freeze, to make sure the RC creation process is running smoothly,
> and
> > to
> > > >> have a common testing reference point.
> > > >>
> > > >> Having said that let us remind after Robert & Dian from the previous
> > > >> release what it a Feature Freeze means:
> > > >>
> > > >> *B) What does feature freeze mean?*After the feature freeze, no new
> > > >> features are allowed to be merged to master. Only bug fixes and
> > > >> documentation improvements.
> > > >> The release managers will revert new feature commits after the
> feature
> > > >> freeze.
> > > >> Rational: The goal of the feature freeze phase is to improve the
> > system
> > > >> stability by addressing known bugs. New features tend to introduce
> new
> > > >> instabilities, which would prolong the release process.
> > > >> If you need to merge a new feature after the freeze, please open a
> > > >> discussion on the dev@ list. If there are no objections by a PMC
> > member
> > > >> within 48 (workday)hours, the feature can be merged.
> > > >>
> > > >> 2. *Merge PRs from the command line*
> > > >>
> > > >> In the past releases it was quite frequent around the Feature Freeze
> > > date
> > > >> that we ended up with a broken main branch that either did not
> compile
> > > or
> > > >> there were failing tests. It was often due to concurrent merges to
> the
> > > main
> > > >> branch via the "Rebase and merge" button. To overcome the problem we
> > > would
> > > >> like to suggest only ever merging PRs from a command line. Thank you
> > > >> Stephan for the idea! The suggested workflow would look as follows:
> > > >>
> > > >>1. Pull the change and rebase on the current main branch
> > > >>2. Build the project (e.g. from IDE, which should be faster than
> > > >>building entire project from cmd) -> this should ensure the
> project
> > > compiles
> > > >>3. Run the tests in the module that the change affects -> this
> > should
> > > >>greatly minimize the chances of failling tests
> > > >>4. Push the change to the main branch
> > > >>
> > > >> Let us know what you think!
> > > >>
> > > >> Best,
> > > >>
> > > >> Guowei & Dawid
> > > >>
> > > >>
> > > >>
> > >
> > >
> >
>


Re: Native kubernetes execution and History server

2021-03-25 Thread Guowei Ma
Hi,
Thanks for providing the logs. From the logs this is a known bug.[1]
Maybe you could use `$internal.pipeline.job-id` to set your own
job-id.(Thanks to Wang Yang)
But keep in mind this is only for internal use and may be changed in
some release. So you should keep an eye on [1] for the correct solution.

[1] https://issues.apache.org/jira/browse/FLINK-19358

Best,
Guowei


On Thu, Mar 25, 2021 at 5:31 PM Lukáš Drbal  wrote:

> Hello,
>
> sure. Here is log from first run which succeed -
> https://pastebin.com/tV75ZS5S
> and here is from second run (it's same for all next) -
> https://pastebin.com/pwTFyGvE
>
> My Docker file is pretty simple, just take wordcount + S3
>
> FROM flink:1.12.2
>
> RUN mkdir -p $FLINK_HOME/usrlib
> COPY flink-examples-batch_2.12-1.12.2-WordCount.jar
>  $FLINK_HOME/usrlib/wordcount.jar
>
> RUN mkdir -p ${FLINK_HOME}/plugins/s3-fs-presto
> COPY flink-s3-fs-presto-1.12.2.jar $FLINK_HOME/plugins/s3-fs-presto/
>
> Thanks!
>
> On Thu, Mar 25, 2021 at 9:24 AM Guowei Ma  wrote:
>
>> Hi,
>> After some discussion with Wang Yang offline, it seems that there might
>> be a jobmanager failover. So would you like to share full jobmanager log?
>> Best,
>> Guowei
>>
>>
>> On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal 
>> wrote:
>>
>>> Hi,
>>>
>>> I would like to use native kubernetes execution [1] for one batch job
>>> and let scheduling on kubernetes. Flink version: 1.12.2.
>>>
>>> Kubernetes job:
>>> apiVersion: batch/v1beta1
>>> kind: CronJob
>>> metadata:
>>>   name: scheduled-job
>>> spec:
>>>   schedule: "*/1 * * * *"
>>>   jobTemplate:
>>> spec:
>>>   template:
>>> metadata:
>>>   labels:
>>> app: super-flink-batch-job
>>> spec:
>>>   containers:
>>>   - name: runner
>>> image: localhost:5000/batch-flink-app-v3:latest
>>> imagePullPolicy: Always
>>> command:
>>>   - /bin/sh
>>>   - -c
>>>   - /opt/flink/bin/flink run-application --target
>>> kubernetes-application -Dkubernetes.service-account=flink-service-account
>>> -Dkubernetes.rest-service.exposed.type=NodePort
>>> -Dkubernetes.cluster-id=batch-job-cluster
>>> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
>>> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
>>> -Ds3.secret-key=SECRETKEY
>>> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
>>> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> -Dhigh-availability.storageDir=s3://flink/flink-ha
>>> local:///opt/flink/usrlib/job.jar
>>>   restartPolicy: OnFailure
>>>
>>>
>>> This works well for me but I would like to write the result to the
>>> archive path and show it in the History server (running as separate
>>> deployment in k8)
>>>
>>> Anytime it creates JobId= which
>>> obviously leads to
>>>
>>> Caused by: java.util.concurrent.ExecutionException:
>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>>> already been submitted.
>>> at
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> ~[?:1.8.0_282]
>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>> ~[?:1.8.0_282]
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
>>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>>> at
>>> org.apache.flink.examples.java.word

Re: flink sql jmh failure

2021-03-25 Thread Guowei Ma
Hi,
I am not an expert of JMH but it seems that it is not an error. From the
log it looks like that the job is not finished.
The data source continues to read data when JMH finishes.

Thread[Legacy Source Thread - Source:
TableSourceScan(table=[[default_catalog, default_database,
CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, second_string]) ->
Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint,
first_int, second_int, first_float, second_float, first_double,
second_double, first_string, second_string]) -> Sink:
Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK],
fields=[dt, first_bigint, second_bigint, first_int, second_int,
first_float, second_float, first_double, second_double, first_string,
second_string]) (3/6),5,Flink Task Threads]
  at
org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
  at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
  at
org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
  at
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

Best,
Guowei


On Wed, Mar 24, 2021 at 9:56 PM jie mei  wrote:

> Hi, Yik San
>
> I use a library wroten by myself and trying to verify the performance.
>
>
> Yik San Chan  于2021年3月24日周三 下午9:07写道:
>
>> Hi Jie,
>>
>> I am curious what library do you use to get the ClickHouseTableBuilder
>>
>> On Wed, Mar 24, 2021 at 8:41 PM jie mei  wrote:
>>
>>> Hi, Community
>>>
>>> I run a jmh benchmark task get blew error, which use flink sql consuming
>>> data from data-gen connector(10_000_000) and write data to clickhouse. blew
>>> is partly log and you can see completable log by attached file
>>>
>>> *My jmh benchmark code as blew:*
>>>
>>> @Benchmark
>>> @Threads(1)
>>> @Fork(1)
>>> public void sinkBenchmark() throws IOException {
>>>
>>>   StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
>>>   .getExecutionEnvironment();
>>>   streamEnv.enableCheckpointing(6);
>>>
>>>   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>   .useBlinkPlanner()
>>>   .inStreamingMode().build();
>>>   TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>>> settings);
>>>
>>>   // create clickhouse table
>>>   new ClickHouseTableBuilder(tableEnv,
>>>   parseSchema("clickhouse_sink_table.sql"))
>>>   .database("benchmark")
>>>   .table("bilophus_sink_benchmark")
>>>   .address("jdbc:clickhouse://localhost:8123")
>>>   .build();
>>>
>>>   // create mock data table
>>>   tableEnv.executeSql(
>>>   parseSchema("clickhouse_source_table.sql") +
>>>   "WITH (" +
>>>   "'connector' = 'datagen'," +
>>>   "'number-of-rows' = '1000')");
>>>
>>>   tableEnv.executeSql(
>>>   "INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM 
>>> CLICKHOUSE_SOURCE_BENCHMARK");
>>>
>>> }
>>>
>>> *running command:*
>>>
>>> mvn clean package -DskipTests
>>>
>>> 
>>>   org.codehaus.mojo
>>>   exec-maven-plugin
>>>   1.6.0
>>>   
>>> 
>>>   test-benchmarks
>>>   test
>>>   
>>> exec
>>>   
>>> 
>>>   
>>>   
>>> false
>>> test
>>> java
>>> 
>>>   -Xmx6g
>>>   -classpath
>>>   
>>>   org.openjdk.jmh.Main
>>>   
>>>   -foe
>>>   true
>>>   
>>>   -f
>>>   1
>>>   -i
>>>   1
>>>   -wi
>>>   0
>>>   -rf
>>>   csv
>>>   .*
>>> 
>>>   
>>> 
>>>
>>>
>>> Non-finished threads:
>>>
>>> Thread[Source: TableSourceScan(table=[[default_catalog,
>>> default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint,
>>> second_bigint, first_int, second_int, first_float, second_float,
>>> first_double, second_double, first_string, s
>>> econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, first_double, second_double, first_string, second_string]) ->
>>> Sink: Sink(table=[default_catal
>>> og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt,
>>> first_bigint, second_bigint, first_int, second_int, first_float,
>>> second_float, 

Re: Native kubernetes execution and History server

2021-03-25 Thread Guowei Ma
Hi,
After some discussion with Wang Yang offline, it seems that there might be
a jobmanager failover. So would you like to share full jobmanager log?
Best,
Guowei


On Wed, Mar 24, 2021 at 10:04 PM Lukáš Drbal  wrote:

> Hi,
>
> I would like to use native kubernetes execution [1] for one batch job and
> let scheduling on kubernetes. Flink version: 1.12.2.
>
> Kubernetes job:
> apiVersion: batch/v1beta1
> kind: CronJob
> metadata:
>   name: scheduled-job
> spec:
>   schedule: "*/1 * * * *"
>   jobTemplate:
> spec:
>   template:
> metadata:
>   labels:
> app: super-flink-batch-job
> spec:
>   containers:
>   - name: runner
> image: localhost:5000/batch-flink-app-v3:latest
> imagePullPolicy: Always
> command:
>   - /bin/sh
>   - -c
>   - /opt/flink/bin/flink run-application --target
> kubernetes-application -Dkubernetes.service-account=flink-service-account
> -Dkubernetes.rest-service.exposed.type=NodePort
> -Dkubernetes.cluster-id=batch-job-cluster
> -Dkubernetes.container.image=localhost:5000/batch-flink-app-v3:latest
> -Ds3.endpoint=http://minio-1616518256:9000 -Ds3.access-key=ACCESSKEY
> -Ds3.secret-key=SECRETKEY
> -Djobmanager.archive.fs.dir=s3://flink/completed-jobs/
> -Ds3.path-style-access=true -Ds3.ssl.enabled=false
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=s3://flink/flink-ha
> local:///opt/flink/usrlib/job.jar
>   restartPolicy: OnFailure
>
>
> This works well for me but I would like to write the result to the archive
> path and show it in the History server (running as separate deployment in
> k8)
>
> Anytime it creates JobId= which obviously
> leads to
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> ~[?:1.8.0_282]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> ~[?:1.8.0_282]
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
> ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_282]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_282]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_282]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> ... 10 more
>
> I assume it is because it will spawn a completely new cluster for each run.
>
> Can I somehow set jobId or I'm trying to do something unsupported/bad?
>
> Thanks for advice.
>
> L.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html
>


Re: Re: About Memory Spilling to Disk in Flink

2021-03-24 Thread Guowei Ma
Hi, Roc
Thanks for your detailed explanation.
I could not find any "stream" operator that uses `ExternalSorterBuilder` by
"find usage" of the IDEA.
Best,
Guowei


On Wed, Mar 24, 2021 at 3:27 PM Roc Marshal  wrote:

> Hi, Guowei Ma.
> As far as I know, flink writes some in-memory data to disk when memory
> is running low. I noticed that flink uses ExternalSorterBuilder for batch
> operations in the org.apache.flink.runtime.operator.sort package, but I'm
> curious to confirm if this technique is also used in stream mode.
> Thank you.
>
> Best,
> Roc
>
>
>
>
>
> At 2021-03-24 15:01:48, "Guowei Ma"  wrote:
>
> Hi, Roc
> Could you explain more about your question?
> Best,
> Guowei
>
>
> On Wed, Mar 24, 2021 at 2:47 PM Roc Marshal  wrote:
>
>> Hi,
>>
>> Can someone tell me where flink uses memory spilling to write to
>> disk?
>> Thank you.
>>
>> Best, Roc.
>>
>>
>>
>>
>
>
>
>


Re: About Memory Spilling to Disk in Flink

2021-03-24 Thread Guowei Ma
Hi, Roc
Could you explain more about your question?
Best,
Guowei


On Wed, Mar 24, 2021 at 2:47 PM Roc Marshal  wrote:

> Hi,
>
> Can someone tell me where flink uses memory spilling to write to disk?
> Thank you.
>
> Best, Roc.
>
>
>
>


Re: Fault Tolerance with RocksDBStateBackend

2021-03-24 Thread Guowei Ma
Hi,
You need some persistent storages(like hdfs) for the checkpoint. It is
Flink's fault tolerance prerequisites.[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/checkpointing.html#prerequisites
Best,
Guowei


On Wed, Mar 24, 2021 at 1:21 PM Maminspapin  wrote:

> Hi everyone,
>
> I want to build a flink cluster with 3 machines.
> What if I choose RocksDBStateBackend with next settings:
>
>
> #==
> # Fault tolerance and checkpointing
>
> #==
>
> state.backend: rocksdb
> state.checkpoints.dir: file:///home/flink/checkpoints
> state.backend.incremental: true
> jobmanager.execution.failover-strategy: region
>
>
> Is it necessary to use distributed file systems like hdfs to provide fault
> tolerance (i.e. state.checkpoints.dir: hdfs:///home/flink/checkpoints)?
>
> There is no fault tolerance with file:// schema?
>
> Thanks,
> Yuri L.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Kubernetes Application Cluster Not Working

2021-03-23 Thread Guowei Ma
Hi, M
Could you give the full stack? This might not be the root cause.
Best,
Guowei


On Wed, Mar 24, 2021 at 2:46 AM Claude M  wrote:

> Hello,
>
> I'm trying to setup Flink in Kubernetes using the Application Mode as
> described here:
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes
>
> The doc mentions that there needs to be a aervice exposing the
> JobManager’s REST and UI ports.  It then points to a link w/ the resource
> definitions:
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions
> and I defined the following service along w/ the jobmanager, taskmanager,
> and flink-conf.
>
> apiVersion: v1
> kind: Service
> metadata:
>   name: flink-jobmanager
> spec:
>   type: ClusterIP
>   ports:
>   - name: rpc
> port: 6123
>   - name: blob-server
> port: 6124
>   - name: webui
> port: 8081
>   selector:
> app: flink
> component: jobmanager
>
>
> I am able to access the jobmanager UI but the taskmanagers are failing w/
> the following error:
> Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager
> :6123/user/rpc/resourcemanager_*
>
> Any ideas about this?
>
>
> Thanks
>


  1   2   >