Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Biao Liu
Congrats!

Thanks,
Biao /'bɪ.aʊ/



On Tue, 28 Mar 2023 at 10:29, Hang Ruan  wrote:

> Congratulations!
>
> Best,
> Hang
>
> yu zelin  于2023年3月28日周二 10:27写道:
>
>> Congratulations!
>>
>> Best,
>> Yu Zelin
>>
>> 2023年3月27日 17:23,Yu Li  写道:
>>
>> Dear Flinkers,
>>
>>
>>
>> As you may have noticed, we are pleased to announce that Flink Table Store 
>> has joined the Apache Incubator as a separate project called Apache 
>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
>> streaming data lake platform for high-speed data ingestion, change data 
>> tracking and efficient real-time analytics, with the vision of supporting a 
>> larger ecosystem and establishing a vibrant and neutral open source 
>> community.
>>
>>
>>
>> We would like to thank everyone for their great support and efforts for the 
>> Flink Table Store project, and warmly welcome everyone to join the 
>> development and activities of the new project. Apache Flink will continue to 
>> be one of the first-class citizens supported by Paimon, and we believe that 
>> the Flink and Paimon communities will maintain close cooperation.
>>
>>
>> 亲爱的Flinkers,
>>
>>
>> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
>> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
>> Apache 
>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>>
>>
>> 在这里我们要感谢大家对 Flink Table Store
>> 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信
>> Flink 和 Paimon 社区将继续保持密切合作。
>>
>>
>> Best Regards,
>> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>>
>> 致礼,
>> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>>
>> [1] https://paimon.apache.org/
>> [2] https://github.com/apache/incubator-paimon
>> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>>
>>
>>


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Biao Liu
Congrats!

Thanks,
Biao /'bɪ.aʊ/



On Tue, 28 Mar 2023 at 10:29, Hang Ruan  wrote:

> Congratulations!
>
> Best,
> Hang
>
> yu zelin  于2023年3月28日周二 10:27写道:
>
>> Congratulations!
>>
>> Best,
>> Yu Zelin
>>
>> 2023年3月27日 17:23,Yu Li  写道:
>>
>> Dear Flinkers,
>>
>>
>>
>> As you may have noticed, we are pleased to announce that Flink Table Store 
>> has joined the Apache Incubator as a separate project called Apache 
>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
>> streaming data lake platform for high-speed data ingestion, change data 
>> tracking and efficient real-time analytics, with the vision of supporting a 
>> larger ecosystem and establishing a vibrant and neutral open source 
>> community.
>>
>>
>>
>> We would like to thank everyone for their great support and efforts for the 
>> Flink Table Store project, and warmly welcome everyone to join the 
>> development and activities of the new project. Apache Flink will continue to 
>> be one of the first-class citizens supported by Paimon, and we believe that 
>> the Flink and Paimon communities will maintain close cooperation.
>>
>>
>> 亲爱的Flinkers,
>>
>>
>> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
>> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
>> Apache 
>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>>
>>
>> 在这里我们要感谢大家对 Flink Table Store
>> 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信
>> Flink 和 Paimon 社区将继续保持密切合作。
>>
>>
>> Best Regards,
>> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>>
>> 致礼,
>> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>>
>> [1] https://paimon.apache.org/
>> [2] https://github.com/apache/incubator-paimon
>> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>>
>>
>>


Re: Job cannot be deployed when use detached mode

2020-01-17 Thread Biao Liu
Ah, thanks Yang for the fixup. I misunderstood the original answer.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 16:39, Yang Wang  wrote:

> Hi sysuke,
>
> >> Why the Yarn per-job attach mode could work, but detach mode could not?
> It is just becausein 1.9 and previous versions, the per-job have very
> different code path for attach and detach
> mode. For attach mode, Flink client will start a session cluster, and then
> submit a job to the existing session.
> So all the users jars are loaded by user classloader, not system
> classloader. For detach mode, all the jars will
> be shipped by Yarn local resources and appended to the system classpath of
> jobmanager and taskmanager.
> The behavior will be changed from 1.10. Both detach and attach will always
> be the real per-job, not simulate
> by session. You could check FLIP-82 for more information[1].
>
> >> How to fix this problem?
> 1. If you Yarn cluster could support multiple hdfs clusters, then you will
> not need to add hdfs configuration in
> you jar. That's how we use it in production environment.
> 2. If you can not change this, and you will use Flink 1.10. Then you could
> set
> `yarn.per-job-cluster.include-user-jar: DISABLED`. Then all the user jars
> will not be added to system classpath.
> Instead, they will be loaded by user classloader. This is a new feature in
> 1.10. Check more information here[2].
> 3. If you are still using the 1.9 and previous versions, move the hdfs
> configuration out of your jar. Then use `-t`
> to ship your hadoop configuration and reset the hadoop env.
> -yt /path/of/my-hadoop-conf
> -yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
> -yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>
>
> [1].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-82%3A+Use+real+per-job+mode+for+YARN+per-job+attached+execution
> [2].https://issues.apache.org/jira/browse/FLINK-13993
>
>
> Best,
> Yang
>
> sysuke Lee  于2020年1月17日周五 下午12:02写道:
>
>> Hi all,
>> We've got a jar with hadoop  configuration files in it.
>>
>> Previously we use blocking mode to deploy jars on YARN, they run well.
>> Recently we find the client process occupies more and more memory , so we
>> try to use detached mode, but the job failed to deploy with following error
>> information:
>>
>> The program finished with the following exception:
>>
>> org.apache.flink.client.deployment.ClusterDeploymentException: Could not 
>> deploy Yarn job cluster.
>> at 
>> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
>> at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:230)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at 
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: 
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
>> The YARN application unexpectedly switched to state FAILED during deployment.
>> Diagnostics from YARN: Application application_1533815330295_30183 failed 2 
>> times due to AM Container for appattempt_ exited with  exitCode: 1
>> For more detailed output, check application tracking page:http:Then, 
>> click on links to logs of each attempt.
>> Diagnostics: Exception from container-launch.
>> Container id: container_e05_
>> Exit code: 1
>> Stack trace: ExitCodeException exitCode=1:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:593)
>> at org.apache.hadoop.util.Shell.run(Shell.java:490)
>> at 
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:784)
>> at 
>> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:298)
>> at 
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:324)
>> at 
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Shell output: main : command provided 1
>> main : user 

Re: How to handle startup for mandatory config parameters?

2020-01-17 Thread Biao Liu
Hi John,

ParameterTools is just a utility to help user to handle arguments.
I guess you are using ParameterTools in main method. If it is, it should be
in client log file, like Yang said, it's under "{FLINK_HOME}/log".

> Do I check someConfig for what ever requirement and just throw an
exception before starting the job or should I do System.exit();

I"m not sure what you exactly want.
Throwing an exception or System.exit would both fail the job (it depends on
where you codes are). However invoking System.exit is not always a good
practice.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 04:59, John Smith  wrote:

> Sorry I should have specified how to handle job specific config parameters
> using ParameterTool
>
> ParameterTool parameters = ...
>
> String someConfig = parameters.get("some.config"); <--- This is mandatory
>
>
> Do I check someConfig for what ever requirement and just throw an
> exception before starting the job or should I do System.exit(); Log it...
> Where does the log if I log it?
>
> On Wed, 15 Jan 2020 at 22:21, Yang Wang  wrote:
>
>> Hi John,
>>
>> Most of the config options will have default values. However, you still
>> need to specify some
>> required fields. For example, the taskmanager resource related options.
>> If you do not specify
>> anyone, the exception will be thrown on the client side like following.
>>
>> Exception in thread "main"
>> org.apache.flink.configuration.IllegalConfigurationException: Either Task
>> Heap Memory size (taskmanager.memory.task.heap.size) and Managed Memory
>> size (taskmanager.memory.managed.size), or Total Flink Memory size
>> (taskmanager.memory.flink.size), or Total Process Memory size
>> (taskmanager.memory.process.size) need to be configured explicitly.
>> at
>> org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:149)
>> at
>> org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
>> at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)
>>
>>
>> Also when you deploy Flink on Yarn cluster, it will check the queue
>> configuration, resource, etc.
>> If some config exception throws during startup, the Flink client will
>> fail and print the exception on
>> the console and client logs(usually in the {FLINK_HOME}/logs directory).
>>
>> However, not all the config options could be checked on the client side.
>> For example, If you set a
>> wrong checkpoint path, then you need to find the exceptions or errors in
>> the jobmanager logs.
>>
>>
>>
>> Best,
>> Yang
>>
>> John Smith  于2020年1月16日周四 上午12:38写道:
>>
>>> Hi, so I have no problem reading config from resources files or anything
>>> like that...
>>>
>>> But my question is around how do we handle mandatory fields?
>>>
>>> 1- If a mandatory field is missing during startup... Do we just "log" it
>>> and do System.exit()?
>>> 2- If we do log it where does the log end up, the task or the job node?
>>>
>>


Re: savepoint failed for finished tasks

2020-01-17 Thread Biao Liu
Hi Fanbin,

Congxian is right. We can't support checkpoint or savepoint on finite
stream job now.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 16:26, Congxian Qiu  wrote:

> Hi
>
> Currently, Checkpoint/savepoint only works if all operators/tasks are
> still running., there is an issue[1] tracking this
>
> [1] https://issues.apache.org/jira/browse/FLINK-2491
>
> Best,
> Congxian
>
>
> Fanbin Bu  于2020年1月17日周五 上午6:49写道:
>
>> Hi,
>>
>> I couldn't make a savepoint for the following graph:
>> [image: image.png]
>>
>> with stacktrace:
>> Caused by: java.util.concurrent.CompletionException:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
>> savepoint. Failure reason: Not all required tasks are currently running.
>>
>> Here is my Snowflake source definition:
>> val batch = env.createInput(JDBCInputFormat.buildJDBCInputFormat
>>   .setDrivername(options.driverName)
>>   .setDBUrl(options.dbUrl)
>>   .setUsername(options.username)
>>   .setPassword(options.password)
>>   .setQuery(query)
>>   .setRowTypeInfo(getInputRowTypeInfo)
>>   .setFetchSize(fetchSize)
>>   .setParametersProvider(new
>> GenericParameterValuesProvider(buildQueryParams(parallelism)))
>>   .finish, getReturnType)
>>
>> where query is something like
>> select * from events where timestamp > t0 and timestamp < t1
>>
>> My theory is that the snowflake_batch_source task changed to FINISHED
>> once it reads all the data. and then savepoint failed.
>>
>> Is there any way to make a savepoint for such cases?
>>
>> Thanks,
>> Fanbin
>>
>>
>>
>>
>>


Re: Job cannot be deployed when use detached mode

2020-01-17 Thread Biao Liu
Hi Sysuke,

Could you check the JM log (YARN AM container log) first?
You might find the direct failure message there.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 12:02, sysuke Lee  wrote:

> Hi all,
> We've got a jar with hadoop  configuration files in it.
>
> Previously we use blocking mode to deploy jars on YARN, they run well.
> Recently we find the client process occupies more and more memory , so we
> try to use detached mode, but the job failed to deploy with following error
> information:
>
> The program finished with the following exception:
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not 
> deploy Yarn job cluster.
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:230)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
> The YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1533815330295_30183 failed 2 
> times due to AM Container for appattempt_ exited with  exitCode: 1
> For more detailed output, check application tracking page:http:Then, 
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e05_
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:593)
> at org.apache.hadoop.util.Shell.run(Shell.java:490)
> at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:784)
> at 
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:298)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:324)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Shell output: main : command provided 1
> main : user is streams
> main : requested yarn user is user1
>
>
> Then I found this email,
> http://mail-archives.apache.org/mod_mbox/flink-user/201901.mbox/
> , and set *yarn.per-job-cluster.include-user-jar: LAST*,  then part of
> our jobs can be deployed as expected.
>
> But for some job need to operate another hdfs, with hadoop conf files in
> them, there's still problem. Job manager cannot resolve the hdfs domain
> name. I guess it's because the hadoop conf file in jar is loaded instead of
> the conf file in client hadoop  dir.
>
> Is here someone can help?
>


Re: Job Manager heap metrics

2020-01-17 Thread Biao Liu
Hi RKandoji,

> Could someone please tell me what is the best way to check amount of heap
consumed by Job Manager?

I'm not sure if there is a best way to achieve this. However there are some
workaround ways.
1. Through metrics [1]
2. Print GC in stdout. You could achieve it through config item
"env.java.opts.jobmanager", like "env.java.opts.jobmanager:
-XX:+PrintGCDetails".

> Also is there any relationship between heap memory settings and
checkpointing, I did not see any but curious.

I guess there is no direct relationship between memory settings ant
checkpointing. However I don't think there is a short answer which could
explain this clearly. Like OOM or heavy full GC, they could affect almost
everything, of course checkpointing is also included :)


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#memory

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 09:12, RKandoji  wrote:

> Hi,
>
> Could someone please tell me what is the best way to check amount of heap
> consumed by Job Manager?
>
> Currently I added huge heap of 20GB for both Job Manager and Task Manager.
> I'm able to see task manager heap usage on UI but not for Job Manager.
>
> I would like to decide how much heap to allocate based on usage.
>
> Also is there any relationship between heap memory settings and
> checkpointing, I did not see any but curious.
>
> Thanks,
> RK
>


Re: [DISCUSS] Change default for RocksDB timers: Java Heap => in RocksDB

2020-01-16 Thread Biao Liu
+1

I think that's how it should be. Timer should align with other regular
state.

If user wants a better performance without memory concern, memory or FS
statebackend might be considered. Or maybe we could optimize the
performance by introducing a specific column family for timer. It could
have its own tuned options.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 10:11, Jingsong Li  wrote:

> Hi Stephan,
>
> Thanks for starting this discussion.
> +1 for stores times in RocksDB by default.
> In the past, when Flink didn't save the times with RocksDb, I had a
> headache. I always adjusted parameters carefully to ensure that there was
> no risk of Out of Memory.
>
> Just curious, how much impact of heap and RocksDb for times on performance
> - if there is no order of magnitude difference between heap and RocksDb,
> there is no problem in using RocksDb.
> - if there is, maybe we should improve our documentation to let users know
> about this option. (Looks like a lot of users didn't know)
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 17, 2020 at 3:18 AM Yun Tang  wrote:
>
>> Hi Stephan,
>>
>> I am +1 for the change which stores timers in RocksDB by default.
>>
>> Some users hope the checkpoint could be completed as fast as possible,
>> which also need the timer stored in RocksDB to not affect the sync part of
>> checkpoint.
>>
>> Best
>> Yun Tang
>> --
>> *From:* Andrey Zagrebin 
>> *Sent:* Friday, January 17, 2020 0:07
>> *To:* Stephan Ewen 
>> *Cc:* dev ; user 
>> *Subject:* Re: [DISCUSS] Change default for RocksDB timers: Java Heap =>
>> in RocksDB
>>
>> Hi Stephan,
>>
>> Thanks for starting this discussion. I am +1 for this change.
>> In general, number of timer state keys can have the same order as number
>> of main state keys.
>> So if RocksDB is used for main state for scalability, it makes sense to
>> have timers there as well
>> unless timers are used for only very limited subset of keys which fits
>> into memory.
>>
>> Best,
>> Andrey
>>
>> On Thu, Jan 16, 2020 at 4:27 PM Stephan Ewen  wrote:
>>
>> Hi all!
>>
>> I would suggest a change of the current default for timers. A bit of
>> background:
>>
>>   - Timers (for windows, process functions, etc.) are state that is
>> managed and checkpointed as well.
>>   - When using the MemoryStateBackend and the FsStateBackend, timers are
>> kept on the JVM heap, like regular state.
>>   - When using the RocksDBStateBackend, timers can be kept in RocksDB
>> (like other state) or on the JVM heap. The JVM heap is the default though!
>>
>> I find this a bit un-intuitive and would propose to change this to let
>> the RocksDBStateBackend store all state in RocksDB by default.
>> The rationale being that if there is a tradeoff (like here), safe and
>> scalable should be the default and unsafe performance be an explicit choice.
>>
>> This sentiment seems to be shared by various users as well, see
>> https://twitter.com/StephanEwen/status/1214590846168903680 and
>> https://twitter.com/StephanEwen/status/1214594273565388801
>> We would of course keep the switch and mention in the performance tuning
>> section that this is an option.
>>
>> # RocksDB State Backend Timers on Heap
>>   - Pro: faster
>>   - Con: not memory safe, GC overhead, longer synchronous checkpoint
>> time, no incremental checkpoints
>>
>> #  RocksDB State Backend Timers on in RocksDB
>>   - Pro: safe and scalable, asynchronously and incrementally checkpointed
>>   - Con: performance overhead.
>>
>> Please chime in and let me know what you think.
>>
>> Best,
>> Stephan
>>
>>
>
> --
> Best, Jingsong Lee
>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Biao Liu
Congrats!

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 13:43, Rui Li  wrote:

> Congratulations Dian, well deserved!
>
> On Thu, Jan 16, 2020 at 5:58 PM jincheng sun 
> wrote:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Dian accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>>
>> Dian Fu has been contributing to Flink for many years. Dian Fu played an
>> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has
>> contributed several major features, reported and fixed many bugs, spent a
>> lot of time reviewing pull requests and also frequently helping out on the
>> user mailing lists and check/vote the release.
>>
>> Please join in me congratulating Dian for becoming a Flink committer !
>>
>> Best,
>> Jincheng(on behalf of the Flink PMC)
>>
>
>
> --
> Best regards!
> Rui Li
>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Biao Liu
Congrats!

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 13:43, Rui Li  wrote:

> Congratulations Dian, well deserved!
>
> On Thu, Jan 16, 2020 at 5:58 PM jincheng sun 
> wrote:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Dian accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>>
>> Dian Fu has been contributing to Flink for many years. Dian Fu played an
>> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has
>> contributed several major features, reported and fixed many bugs, spent a
>> lot of time reviewing pull requests and also frequently helping out on the
>> user mailing lists and check/vote the release.
>>
>> Please join in me congratulating Dian for becoming a Flink committer !
>>
>> Best,
>> Jincheng(on behalf of the Flink PMC)
>>
>
>
> --
> Best regards!
> Rui Li
>


Re: DataStream API min max aggregation on other fields

2019-12-19 Thread Biao Liu
Hi Lu,

@vino yang  I think what he means is that the "max"
semantics between window and non-window are different. It changes
non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of
codes, "max" always keeps the non-aggregated fields with the value of first
arrived record, which should be (0, 0, x) in this case. However when the
window is purged, the state (which keeps non-aggregated fields of first
arrived record and the maximum field) will be cleared. That means the
"first arrived record" will always be reset when a window is purged. That's
why the second column increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang  wrote:

> Hi weizheng,
>
> IMHO, I do not know where is not clear to you? Is the result not correct?
> Can you share the correct result based on your understanding?
>
> The "keyBy" specifies group field and min/max do the aggregation in the
> other field based on the position you specified.
>
> Best,
> Vino
>
> Lu Weizheng  于2019年12月19日周四 下午5:00写道:
>
>> Hi all,
>>
>> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
>> element. It means other fields will be kept as the max or min element. This
>> is quite clear. However, when I use max or min, how do Flink do on other
>> fields?
>>
>> val tupleStream = senv.fromElements(
>>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
>> )
>> //  (0,0,0)
>> //  (0,0,1)
>> //  (0,0,2)
>> //  (1,0,6)
>> //  (1,0,7)
>> //  (1,0,8)
>> val maxByStream = tupleStream.keyBy(0).max(2).print()
>>
>> In this case, the second field use the first element's 0.
>>
>> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>>
>>   var isRunning: Boolean = true
>>   var i = 0
>>
>>   val rand = new Random()
>>
>>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>>
>> while (isRunning) {
>>
>>   // 将数据源收集写入SourceContext
>>   srcCtx.collect((0, i, i))
>>   i += 1
>>   Thread.sleep(1000)
>> }
>>   }
>>
>>   override def cancel(): Unit = {
>> isRunning = false
>>   }
>> }
>>
>> //(0,0,0)
>> //(0,1,2)
>> //(0,3,4)
>> //(0,5,6)
>> //(0,7,8)
>> //(0,9,10)
>>
>> val maxWindowStream = senv.addSource(new IntTupleSource)
>>   .keyBy(0)
>>   .timeWindow(Time.milliseconds(2000))
>>   .max(2).print()
>>
>>
>>
>> In this case, the result is not so clear...
>>
>> So, for max and min, the two operator can not make sure the result of
>> other fields ?
>>
>> Thank you so much if anyone can replay.
>>
>> Weizheng
>>
>


Re: Apache Flink - Flink Metrics - How to distinguish b/w metrics for two job manager on the same host

2019-12-18 Thread Biao Liu
Hi Mans,

That's indeed a problem. We have a plan to fix it. I think it could be
included in 1.11. You could follow this issue [1] to check the progress.

[1] https://issues.apache.org/jira/browse/FLINK-9543

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 14:51, vino yang  wrote:

> Hi Mans,
>
> IMO, one job manager represents one Flink cluster and one Flink cluster
> has a suite of Flink configuration e.g. metrics reporter.
>
> Some metrics reporters support tag feature, you can specify it to
> distinguish different Flink cluster.[1]
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
>
> Best,
> Vino
>
> M Singh  于2019年12月19日周四 上午2:54写道:
>
>> Hi:
>>
>> I am using AWS EMR with Flink application and two of the job managers are
>> running on the same host.  I am looking at the metrics documentation (Apache
>> Flink 1.9 Documentation: Metrics
>> )
>> and and see the following:
>>
>> Apache Flink 1.9 Documentation: Metrics
>>
>>
>> 
>>
>>- metrics.scope.jm
>>   - Default: .jobmanager
>>   - Applied to all metrics that were scoped to a job manager.
>>   -
>>
>> ...
>> List of all Variables
>> 
>>
>>- JobManager: 
>>- TaskManager: , 
>>- Job: , 
>>- Task: , , ,
>>, 
>>- Operator: ,, 
>>
>>
>>
>> My question is there a way to distinguish b/w the two job managers ? I
>> see only the  variable for JobManager and since the two are running
>> on the same host, the value is the same.  Is there any other variable that
>> I can use to distinguish the two.
>>
>> For taskmanager I have taskmanager id but am not sure about the job
>> manager.
>>
>> Thanks
>>
>> Mans
>>
>>


Re: MiniCluster with ProcessingTimeTrigger

2019-12-18 Thread Biao Liu
gt; env.setParallelism(1);
> CollectSink.values.clear();
>
> List listOfNumbers = IntStream.rangeClosed(1,
> inputSize).boxed().collect(Collectors.toList());
>
> // 1st half of pipeline
> //DataStream> pipeA = env.fromCollection(listOfNumbers)
> DataStream> pipeA = env.addSource(new
> StreamTest.DripFeed(inputSize))
> .windowAll(TumblingProcessingTimeWindows.of(*Time.seconds(2)*))
>
> ...(same as before...)
>
>
>
>
> --
> *From:* Biao Liu 
> *Sent:* Tuesday 17 December 2019 21:50
> *To:* John Morrow 
> *Cc:* user 
> *Subject:* Re: MiniCluster with ProcessingTimeTrigger
>
> Hi John,
>
> The root cause is the collection source exits too fast. The window would
> also exit without being triggered.
>
> You could verify that by waiting a second before releasing the window. For
> example, insert a map operator between source and window operator. Blocking
> a second or more in the "close" method of this map operator. You will see
> the window would work well.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 18 Dec 2019 at 06:24, John Morrow 
> wrote:
>
> Hi All,
>
> I'm trying to test a pipeline that consists of two Flink tasks with a
> MiniCluster. The 1st task has a WindowAll operator which groups items into
> batches every second, and the 2nd task does an async operation with each
> batch and flatMaps the result.
>
> I've whittled it down to the bare bones below. There are two tests:
>
>- testPipelineWithCountTrigger - this one works fine 
>- testPipelineWithProcessingTimeTrigger - this one doesn't give any
>output 
>
>
> It seems like a timing issue. If I step through the failing one slowly I
> can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear
> methods do get called, and the asyncInvoke method also gets called, but
> when I run it the 2nd test fails as it produces no output. I've tried
> setting the MiniCluster timeout to 1 day, the same with my AsyncUDF
> timeout, and sleeping for 3 * window after env.execute but no difference.
> I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build
> 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).
>
>
> Any idea how I can get the 2nd test to wait to process the output?
>
>
> Thanks 
>
> John.
>
>
>
>
>
>
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
> import org.apache.flink.streaming.api.datastream.AsyncDataStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
> import
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.test.util.MiniClusterWithClientResource;
> import org.apache.flink.util.Collector;
> import org.junit.jupiter.api.Tag;
> import org.junit.jupiter.api.Test;
>
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> import java.util.stream.StreamSupport;
>
> import static org.junit.jupiter.api.Assertions.assertEquals;
>
>
> public class StreamTest {
>
>   @Test // :)
>   @Tag("unit")
>   public void testPipelineWithCountTrigger() throws Exception {
> runPipeline(10, CountTrigger.of(10));
>   }
>
>   @Test // :(
>   @Tag("unit")
>   public void testPipelineWithProcessingTimeTrigger() throws Exception {
> runPipeline(10, ProcessingTimeTrigger.create());
>   }
>
>
>   private void runPipeline(int inputSize, Trigger
> trigger) throws Exception {
>
> MiniClusterWithClientResource miniCluster = new
> MiniClusterWithClientResource(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberSlotsPerTaskManager(1)
> 

Re: RichAsyncFunction Timeout

2019-12-17 Thread Biao Liu
Hi Polarisary,

It's hard to tell what happened without further detail. Just some guesses.
1. Have you completed the "resultFuture" in "asyncInvoke"? Asking this is
because there is only a part of "asyncInvoke" implementation, I can't see
the completion part.
2. The capacity (10) of async waiting queue is enough or not? The time of
waiting queue available is also a part of the timeout calculation. It seems
this behavior has been changed in master branch recently. I'm not sure if
it's included or not in your version.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 11:09, Polarisary  wrote:

> Hi ALL,
> When I use RichAsyncFunction read data from hbase, it always timeout after
> a few minutes. but the hbase connection is not close, it also can get data
> in the override method timeout.
>
> Following is the code, does somebody know why trigger timeout.
>
> 
>
> AsyncDataStream.unorderedWait(uidDs, new AsyncHBaseRequest(hTableName,
> 
> HBaseConfigurationUtil.serializeConfiguration(hbaseClientConf), hbaseSchema)
> , 5, TimeUnit.MINUTES, 10)
>
>
>
> @Override
> public void timeout(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
>
> Row r = 
> readHelper.parseToRow(table.get(readHelper.createGet("13491261515587439bf2f217")));
> logger.error("Timeout Error, input [{}], conn {}, row [{}]", input.f0, 
> hConnection.isClosed(), r.toString());
> }
>
> @Override
> public void asyncInvoke(Tuple1 input, ResultFuture Short, Short, Long, Integer, Long>> resultFuture) throws Exception {
> FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new 
> BinaryComparator(Bytes.toBytes("f1")));
> String rkStart = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 0);
> String rkEnd = 
> UserInstallAppLookupTableSource.getLeftRowkeyByUid(input.f0, 9L);
>
>
>
>
> polaris...@gmail.com
>
>
>
>
>


Re: MiniCluster with ProcessingTimeTrigger

2019-12-17 Thread Biao Liu
Hi John,

The root cause is the collection source exits too fast. The window would
also exit without being triggered.

You could verify that by waiting a second before releasing the window. For
example, insert a map operator between source and window operator. Blocking
a second or more in the "close" method of this map operator. You will see
the window would work well.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Dec 2019 at 06:24, John Morrow  wrote:

> Hi All,
>
> I'm trying to test a pipeline that consists of two Flink tasks with a
> MiniCluster. The 1st task has a WindowAll operator which groups items into
> batches every second, and the 2nd task does an async operation with each
> batch and flatMaps the result.
>
> I've whittled it down to the bare bones below. There are two tests:
>
>- testPipelineWithCountTrigger - this one works fine 
>- testPipelineWithProcessingTimeTrigger - this one doesn't give any
>output 
>
>
> It seems like a timing issue. If I step through the failing one slowly I
> can see that the ProcessingTimeTrigger's onElement/onProcessingTime/clear
> methods do get called, and the asyncInvoke method also gets called, but
> when I run it the 2nd test fails as it produces no output. I've tried
> setting the MiniCluster timeout to 1 day, the same with my AsyncUDF
> timeout, and sleeping for 3 * window after env.execute but no difference.
> I'm running this with Flink 1.9.0 and OpenJDK8 on Ubuntu (build
> 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10).
>
>
> Any idea how I can get the 2nd test to wait to process the output?
>
>
> Thanks 
>
> John.
>
>
>
>
>
>
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
> import org.apache.flink.streaming.api.datastream.AsyncDataStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
> import
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.test.util.MiniClusterWithClientResource;
> import org.apache.flink.util.Collector;
> import org.junit.jupiter.api.Tag;
> import org.junit.jupiter.api.Test;
>
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> import java.util.stream.IntStream;
> import java.util.stream.StreamSupport;
>
> import static org.junit.jupiter.api.Assertions.assertEquals;
>
>
> public class StreamTest {
>
>   @Test // :)
>   @Tag("unit")
>   public void testPipelineWithCountTrigger() throws Exception {
> runPipeline(10, CountTrigger.of(10));
>   }
>
>   @Test // :(
>   @Tag("unit")
>   public void testPipelineWithProcessingTimeTrigger() throws Exception {
> runPipeline(10, ProcessingTimeTrigger.create());
>   }
>
>
>   private void runPipeline(int inputSize, Trigger
> trigger) throws Exception {
>
> MiniClusterWithClientResource miniCluster = new
> MiniClusterWithClientResource(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberSlotsPerTaskManager(1)
> .setNumberTaskManagers(1)
>
> .setShutdownTimeout(org.apache.flink.api.common.time.Time.of(1,
> TimeUnit.DAYS))
> .build()
> );
> miniCluster.before();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> CollectSink.values.clear();
>
> List listOfNumbers = IntStream.rangeClosed(1,
> inputSize).boxed().collect(Collectors.toList());
>
> // 1st half of pipeline
> DataStream> pipeA = env.fromCollection(listOfNumbers)
> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
> .trigger(trigger)
> .process(new Batcher());
>
> // 2nd half of pipeline
> DataStream pipeB = AsyncDataStream.unorderedWait(pipeA, new
> AsyncUDF(), 1L, TimeUnit.DAYS, 1 )
> .flatMap((List records, Collector out) ->
> records.forEach(out::collect)).returns(Types.INT);
> pipeB.addSink(new CollectSink());
>
> env.execute();
>
> try {
>   Thread.sleep(1000L * 3);
> } catch (InterruptedException e) {
>   System.out.println();
> }
> 

Re: A problem of open in AggregateFunction

2019-12-04 Thread Biao Liu
Hi Guobao,

I just re-checked the data stream API. There is an interesting restriction
of `AggregateFunction`. It could not be a rich function. And there are
already several relevant issues [1][2][3].
I guess your scenario might be relevant to this restriction (I assume you
are using table/SQL API).

The bad news is none of these issues are implemented. I didn't find a
workaround way to access metric group in data stream API. I'm not sure it
could be done in table/SQL or not.

[1] https://issues.apache.org/jira/browse/FLINK-14275
[2] https://issues.apache.org/jira/browse/FLINK-5094
[3] https://issues.apache.org/jira/browse/FLINK-11198

Thanks,
Biao /'bɪ.aʊ/



On Wed, 4 Dec 2019 at 11:23, Jingsong Li  wrote:

> Hi Guobao,
>
> Looks like this is from table/SQL API.
> You can override public void open(FunctionContext context)
> It should work, can you provide more information? Like:
> - version
> - which planner
> - what problem, open method never being invoked?
>
> Best,
> Jingsong Lee
>
> On Wed, Dec 4, 2019 at 11:09 AM Biao Liu  wrote:
>
>> Hi Guobao,
>>
>> Are you using table API? I'm not familiar with table API, but for data
>> stream API, generally speaking user could do some initialization through
>> "open" method of "Rich" function, like "RichAggregateFunction".
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 3 Dec 2019 at 22:44, Guobao Li  wrote:
>>
>>> Hi community,
>>>
>>>
>>>
>>> I am trying to register a metric in an aggregate UDF by overriding the
>>> *open* function. According to the documentation, the *open* function
>>> can be override in order to retrieve the metric group to do the metric
>>> registration. But it works only on ScalarFunction not on AggregateFunction.
>>> Since the *open* function is not invoked by AggregateFunction. Could
>>> anyone help me out of it?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Guobao
>>>
>>>
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: A problem of open in AggregateFunction

2019-12-03 Thread Biao Liu
Hi Guobao,

Are you using table API? I'm not familiar with table API, but for data
stream API, generally speaking user could do some initialization through
"open" method of "Rich" function, like "RichAggregateFunction".

Thanks,
Biao /'bɪ.aʊ/



On Tue, 3 Dec 2019 at 22:44, Guobao Li  wrote:

> Hi community,
>
>
>
> I am trying to register a metric in an aggregate UDF by overriding the
> *open* function. According to the documentation, the *open* function can
> be override in order to retrieve the metric group to do the metric
> registration. But it works only on ScalarFunction not on AggregateFunction.
> Since the *open* function is not invoked by AggregateFunction. Could
> anyone help me out of it?
>
>
>
> Thanks,
>
> Guobao
>
>
>


Re: How to recover state from savepoint on embedded mode?

2019-11-29 Thread Biao Liu
Hi Reo,

Maybe we could find another way.

> why I am not use the standalnoe mode to run the job is because the
running env haven't zookeeper, and would not install the zookeeper. So I
need to depend on the embedded mode to run my job.

You could set up a standalone cluster without zookeeper.
Do not set "high-availability" in flink-conf.yaml or set it to "NONE". And
provide the "jobmanager.rpc.address" and "jobmanager.rpc.port" in
flink-conf.yaml at the same time.
In this way, you could build a standalone cluster, see more details in [1].

Could it satisfy your requirement?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/cluster_setup.html

Thanks,
Biao /'bɪ.aʊ/



On Fri, 29 Nov 2019 at 18:45, Dawid Wysakowicz 
wrote:

> Hi,
>
> I would like to clarify previous responses a bit.
>
> 1. From the architectural point of view yes it is true it is possible to
> restore from a savepoint from a local jvm as long as this jvm has access to
> the checkpoint.
>
> 2. Unfortunately the configuration you pass to the ctor of
> LocalStreamEnvironment is not passed to the StreamGraphGenerator which sets
> the savepoint configuration. That said, unless I am wrong this approach
> will not work.
>
> 3. There is no easy and officially supported way to do this. The official
> way would be to start a local cluster and submit your job remotely to that
> local cluster, which you can also debug remotely.
>
> I know this is not perfect. A different workaround I can offer would be to
> modify/reuse the LocalExecutionEnvironment a bit.
>
> You can
>
>1.  get a StreamGraph from a StreamExecutionEnvironment (via
>StreamExecutionEnvironment#getStreamGraph),
>2.  generate a JobGraph out of it,
>3.  set the savepoint settings
>4.  and submit it locally to a MiniCluster.
>
> You can reuse majority of the code from the
> LocalStreamEnvironment#execute(StreamGraph) method. The thing you have to
> add is once you get the jobGraph:
>
> jobGrap.setSavepointRestoreSettings(...)
>
> I know this is not the nicest solution, but some of my colleagues are
> currently working on improving the job submission api. (Some of the FLIPs
> around the topic are:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> and
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> ).
>
> Best,
>
> Dawid
> On 28/11/2019 19:56, Arvid Heise wrote:
>
> Just to add up, if you use LocalStreamEnvironment, you can pass a
> configuration and you can set "execution.savepoint.path" to point to your
> savepoint.
>
> Best,
>
> Arvid
>
> On Wed, Nov 27, 2019 at 1:00 PM Congxian Qiu 
> wrote:
>
>> Hi,
>>
>> You can recovery from checkpoint/savepoint if JM can read from the
>> given path. no math which mode the job is running on.
>>
>> Best,
>> Congxian
>>
>>
>> Reo Lei  于2019年11月26日周二 下午12:18写道:
>>
>>>
>>>
>>> -- Forwarded message -
>>> 发件人: Reo Lei 
>>> Date: 2019年11月26日周二 上午9:53
>>> Subject: Re: How to recover state from savepoint on embedded mode?
>>> To: Yun Tang 
>>>
>>>
>>> Hi Yun,
>>> Thanks for your reply. what I say the embedded mode is the whole flink
>>> cluster and job, include jobmanager, taskmanager and the job application
>>> itself, running within a local JVM progress, which is use the "
>>> LocalStreamEnvironment" within the job. And the start command look like
>>> this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
>>> com.a.b.c.MyJob > /dev/null &"
>>>
>>> why I am not use the standalnoe mode to run the job is because the
>>> running env haven't zookeeper, and would not install the zookeeper. So I
>>> need to depend on the embedded mode to run my job.
>>>
>>> BR,
>>> Reo
>>>
>>> Yun Tang  于2019年11月26日周二 上午2:38写道:
>>>
 What is the embedded mode mean here? If you refer to SQL embedded mode,
 you cannot resume from savepoint now; if you refer to local standalone
 cluster, you could use `bin/flink run -s` to resume on a local cluster.



 Best

 Yun Tang



 *From: *Reo Lei 
 *Date: *Tuesday, November 26, 2019 at 12:37 AM
 *To: *"user@flink.apache.org" 
 *Subject: *How to recover state from savepoint on embedded mode?



 Hi,

 I have a job need running on embedded mode, but need to init some rule
 data from a database before start. So I used the State Processor API to
 construct my state data and save it to the local disk. When I want to used
 this savepoint to recover my job, I found resume a job from a savepoint
 need to use the command `bin/flink run -s :savepointPath *[*:runArgs]`
 to submit a job to flink cluster. That is mean the job is run on remote
 mode, not embedded mode.



 And I was wondering why I can't resume a job from a savepoint on
 embedded mode. If that is possible, what should I do?

 BTW, if we can not  resume a job from a savepoint on embedded mode, 

Re: stack job on fail over

2019-11-26 Thread Biao Liu
Hi Nick,

Yes, reducing heartbeat timeout is not a perfect solution. It just
alleviates the pain a bit.

I'm wondering my guess is right or not. Is it caused by heartbeat
detection? Does it help with an elegant way of shutting down?

Thanks,
Biao /'bɪ.aʊ/



On Tue, 26 Nov 2019 at 20:22, Nick Toker  wrote:

> Thanks
> its to the trick
>
>
> regards,
> nick
>
> On Tue, Nov 26, 2019 at 11:26 AM Biao Liu  wrote:
>
>> Hi Nick,
>>
>> I guess the reason is your Flink job manager doesn't detect the task
>> manager is lost until heartbeat timeout.
>> You could check the job manager log to verify that.
>>
>> Maybe a more elegant way of shutting down task manager helps, like
>> through "taskmanager.sh stop" or "kill" command without 9 signal.
>> Or you could reduce heartbeat interval and timeout through configuration
>> "heartbeat.interval" and "heartbeat.timeout".
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 26 Nov 2019 at 16:09, Nick Toker  wrote:
>>
>>> Hi
>>> i have a standalone cluster with 3 nodes  and rocksdb backend
>>> when one task manager fails ( the process is being killed)
>>> it takes very long time until the job is totally canceled and a new job
>>> is resubmitted
>>> i see that all slots on all nodes are being canceled except from the
>>> slots of the dead
>>> task manager , it takes about 30- 40 second for the job to totally
>>> shutdown.
>>> is that something i can do to reduce this time or there is a plan for a
>>> fix ( if so when)?
>>>
>>> regards,
>>> nick
>>>
>>


Re: How to custom (or use) a window to specify everyday's beginning as watermark?

2019-11-26 Thread Biao Liu
Hi Rock,

There is an inaccurate description in last response. I don't think a
watermark of 0 clock is needed to get the accurate calculation result. The
watermark of 0 clock only helps to generate the result you want immediately.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 26 Nov 2019 at 18:10, Biao Liu  wrote:

> Hi Rock,
>
> From my understanding, what you want is a one-day time based window which
> start at 0 clock.
> Actually the one-day time-based window (like Jack mentioned) starts at the
> beginning of day (0:00). You don't need to do anything special.
>
> If you are using event time window (since you mentioned watermark), the
> only thing you need to do is generating the correct watermark.
> For example, to get the accurate one-day aggregation (you don't want to
> involve any record from next day), a watermark of 0 clock might be needed.
> You may find the answer in [1][2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamp_extractors.html
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 26 Nov 2019 at 17:46, Jark Wu  wrote:
>
>> Hi Rock,
>>
>> Sorry, I don't fully understand what you want.
>> If you want a tumbling window which covers one day,
>> you can use `KeyedStream#timeWindow(Time.days(1))` which covers from UTC
>> 00:00~24:00.
>>
>> Best,
>> Jark
>>
>>
>> On Tue, 26 Nov 2019 at 17:20, Caizhi Weng  wrote:
>>
>>> Hi Rock,
>>>
>>> I think you can write your own trigger which fires when the date of the
>>> process time of the current record is different from that of the last
>>> record.
>>>
>>> Pinging @Jark Wu for a more professional answer.
>>>
>>> Rock  于2019年11月26日周二 下午3:37写道:
>>>
>>>> I need my job to aggregator every device's mertic as daily report.But I
>>>> did not find a window can cover exactly one day,or let everyday's beginning
>>>> as watermark .Should I custom a window or any other way to achieve?
>>>>
>>>


Re: How to custom (or use) a window to specify everyday's beginning as watermark?

2019-11-26 Thread Biao Liu
Hi Rock,

>From my understanding, what you want is a one-day time based window which
start at 0 clock.
Actually the one-day time-based window (like Jack mentioned) starts at the
beginning of day (0:00). You don't need to do anything special.

If you are using event time window (since you mentioned watermark), the
only thing you need to do is generating the correct watermark.
For example, to get the accurate one-day aggregation (you don't want to
involve any record from next day), a watermark of 0 clock might be needed.
You may find the answer in [1][2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamp_extractors.html

Thanks,
Biao /'bɪ.aʊ/



On Tue, 26 Nov 2019 at 17:46, Jark Wu  wrote:

> Hi Rock,
>
> Sorry, I don't fully understand what you want.
> If you want a tumbling window which covers one day,
> you can use `KeyedStream#timeWindow(Time.days(1))` which covers from UTC
> 00:00~24:00.
>
> Best,
> Jark
>
>
> On Tue, 26 Nov 2019 at 17:20, Caizhi Weng  wrote:
>
>> Hi Rock,
>>
>> I think you can write your own trigger which fires when the date of the
>> process time of the current record is different from that of the last
>> record.
>>
>> Pinging @Jark Wu for a more professional answer.
>>
>> Rock  于2019年11月26日周二 下午3:37写道:
>>
>>> I need my job to aggregator every device's mertic as daily report.But I
>>> did not find a window can cover exactly one day,or let everyday's beginning
>>> as watermark .Should I custom a window or any other way to achieve?
>>>
>>


Re: stack job on fail over

2019-11-26 Thread Biao Liu
Hi Nick,

I guess the reason is your Flink job manager doesn't detect the task
manager is lost until heartbeat timeout.
You could check the job manager log to verify that.

Maybe a more elegant way of shutting down task manager helps, like through
"taskmanager.sh stop" or "kill" command without 9 signal.
Or you could reduce heartbeat interval and timeout through configuration
"heartbeat.interval" and "heartbeat.timeout".

Thanks,
Biao /'bɪ.aʊ/



On Tue, 26 Nov 2019 at 16:09, Nick Toker  wrote:

> Hi
> i have a standalone cluster with 3 nodes  and rocksdb backend
> when one task manager fails ( the process is being killed)
> it takes very long time until the job is totally canceled and a new job is
> resubmitted
> i see that all slots on all nodes are being canceled except from the slots
> of the dead
> task manager , it takes about 30- 40 second for the job to totally
> shutdown.
> is that something i can do to reduce this time or there is a plan for a
> fix ( if so when)?
>
> regards,
> nick
>


Re: Mac操作系统下Ask timed out问題

2019-11-04 Thread Biao Liu
你好,

MacOS 可以跑 Flink,我自己刚试了下,复制你的命令就可以跑。
建议再查一下你本地的环境,你本地的 Flink 是自己编译的吗?如果不行试一下 Flink 提供的 binary 包 [1]?

[1] https://flink.apache.org/downloads.html

Thanks,
Biao /'bɪ.aʊ/



On Tue, 5 Nov 2019 at 12:30, jeff kit  wrote:

> HI,大家好:
> 我在运行Flink官方的Quick
> Start就遇到了问題。为了避免自己问蠢问題,我先做了很多尝试,如换Flink的版本,从1.7到1.8及至1.9都试过,在我自己的Mac OS
> X上这个问題是必然出现的,而换到其他操作系统例如Windows,则是正常的。
>
> 这也许不是一个常见的问題,更多是我本机的运行环境问題,但多天尝试下来仍然没有找到解决方法,才在这里求助一下。
>
> 操作步骤:
> 1. ./bin/start-cluster.sh  # 启动flink。
> 2. ./bin/flink run examples/batch/WordCount.jar   # 提交wordCount 包
>
> 随后就是抛了异常:
> Starting execution of program
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 81bc8720dee57710788cc8e41079ba4d)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:88)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at
> org.apache.flink.client.cli.CliFrontend$$Lambda$31/1990451863.call(Unknown
> Source)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient$$Lambda$44/1067599825.apply(Unknown
> Source)
> at
>
> java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
> at
>
> java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
> at
>
> java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$63/31844.accept(Unknown
> Source)
> at
>
> java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
> at
>
> java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
> at
>
> java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at 

Re: 在 Trigger 里可以知道 Window 中数据的状况吗

2019-11-04 Thread Biao Liu
你好,

countWindow [1] 能满足你的需求吗?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#countWindow-long-

Thanks,
Biao /'bɪ.aʊ/



On Tue, 5 Nov 2019 at 14:01, Utopia  wrote:

> 大家好,
>
> 我想根据 Window 中数据的信息,比如数据的数量来决定是否 Fire,应该怎么实现呢?是否必须自己维护这样的状态。
>
> Best  regards
> Utopia
>


Fwd: 从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 Thread Biao Liu
Hi, this topic should be sent to user-zh mailing list. Just forward there.

Thanks,
Biao /'bɪ.aʊ/



-- Forwarded message -
From: Yun Tang 
Date: Tue, 5 Nov 2019 at 13:20
Subject: Re: 从 state 中恢复数据,更改 yarn container 个数会有影响吗
To: wangl...@geekplus.com.cn , user <
user@flink.apache.org>


Hi



首先先判断作业是否在不断地failover,是否有“maximum parallelism”
相关的异常,如果有,说明因为改了并发度而不兼容,实际作业一直都没有从checkpoint正常恢复。



如果作业成功地从checkpoint恢复了,再判断是不是因为task端正在因为正在改并发而导致恢复数据中,如果你的state
比较大,这一步骤可能会比较耗时,一般这种情况是source端消费了数据,但是无法向下游发送,整个作业看上去像是一直卡在那边。可以通过task端的
jstak看调用栈,看是否有restore相关的栈hang住。



如果以上都不是,那请自行jstack看一下source和下游task的CPU在进行什么操作,再做之后的判断。



祝好

唐云





*From: *"wangl...@geekplus.com.cn" 
*Date: *Tuesday, November 5, 2019 at 11:48 AM
*To: *user 
*Subject: *从 state 中恢复数据,更改 yarn container 个数会有影响吗





从 RocketMQ 中消费数据做处理。

代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr

运行一段时间后以 savepoint 方式停止。

再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从
RocketMQ 消费数据了,消费 TPS 一直是 0,这是什么原因呢?





谢谢,

王磊




--

wangl...@geekplus.com.cn


Fwd: 从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 Thread Biao Liu
Hi, this topic should be sent to user-zh mailing list. Just forward there.

Thanks,
Biao /'bɪ.aʊ/



-- Forwarded message -
From: Yun Tang 
Date: Tue, 5 Nov 2019 at 13:20
Subject: Re: 从 state 中恢复数据,更改 yarn container 个数会有影响吗
To: wangl...@geekplus.com.cn , user <
u...@flink.apache.org>


Hi



首先先判断作业是否在不断地failover,是否有“maximum parallelism”
相关的异常,如果有,说明因为改了并发度而不兼容,实际作业一直都没有从checkpoint正常恢复。



如果作业成功地从checkpoint恢复了,再判断是不是因为task端正在因为正在改并发而导致恢复数据中,如果你的state
比较大,这一步骤可能会比较耗时,一般这种情况是source端消费了数据,但是无法向下游发送,整个作业看上去像是一直卡在那边。可以通过task端的
jstak看调用栈,看是否有restore相关的栈hang住。



如果以上都不是,那请自行jstack看一下source和下游task的CPU在进行什么操作,再做之后的判断。



祝好

唐云





*From: *"wangl...@geekplus.com.cn" 
*Date: *Tuesday, November 5, 2019 at 11:48 AM
*To: *user 
*Subject: *从 state 中恢复数据,更改 yarn container 个数会有影响吗





从 RocketMQ 中消费数据做处理。

代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr

运行一段时间后以 savepoint 方式停止。

再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从
RocketMQ 消费数据了,消费 TPS 一直是 0,这是什么原因呢?





谢谢,

王磊




--

wangl...@geekplus.com.cn


Re: [metrics] metrics 中 Availability 和 Checkpointing 这两组没有显示

2019-11-04 Thread Biao Liu
你好,

JM 的 metric 应该也会直接 report。
可以考虑缩小下问题范围,是 metrics 还是 reporter 的问题。
例如加个 slf4j reporter [1],看下 JM log 中有没有相应的 metrics,如果有那就是 reporter 的问题。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

Thanks,
Biao /'bɪ.aʊ/



On Tue, 22 Oct 2019 at 17:37, Blake  wrote:

> 使用 PrometheusReporter  去 report metric 信息
> 发现:9250 端口 没有显示 Availability 和 Checkpointing 这两部分的信息
> 是需要单独配置吗?在文档里面没有看到相关说明。
> 我注意到:这两个的 scope 都是 Job (only available on JobManager)
> 是要在启动时,指定额外的参数吗?
>
>
>
>
> 配置如下:
> flink-conf.yml
> metrics.reporters: prom
>
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: 9250
>
> metrics.system-resource: true
>
>
>
>
>
>
>
> docker-compose.yml 局部:
> services:
>   jobmanager:
> # image: flink:1.9.0
> build: ./job_manager
> container_name: jobmanager_1.9.0
> volumes:
>   - ./prometheus/:/etc/prometheus/
>   - prometheus_data:/prometheus
> ports:
>   - "8081:8081"
>   - "9250:9250"
> expose:
>   - "6123"
> networks:
>   - back-tier
>   # - host-tier
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>   taskmanager:
> # image: flink:1.9.0
> build: ./task_manager
> container_name: taskmanager_1.9.0
> ports:
>   # - "9001:9001"
>   - "9251:9251"
> expose:
>   - "6121"
>   - "6122"
> networks:
>   - back-tier
>   # - host-tier
> command: taskmanager
> depends_on:
>   - jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>
>
>
>
> Docker
> FROM flink:1.9.0
>
> COPY flink-conf.yaml ./conf/
>
> RUN cp ./opt/flink-metrics-prometheus-1.9.0.jar ./lib/


Re: Re: 怎样把 state 定时写到外部存储

2019-11-04 Thread Biao Liu
你好,

对你的问题描述有一些疑问

> 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住
> 有没有什么方式可以定期读 state 写到外部存储?

这里是什么意思呢?更改 state 值和写外部系统存储应该是两个独立的事件。state 是 Flink 内部使用的,给外部系统使用的数据一般通过
sink 写出去,和 state 没有直接关系。

从你的描述中,只看到貌似是写 Mysql (是通过 sink 吗?) 扛不住。批量写一下?比如在 sink 中处理一下

如果没理解对你的问题,你可以再详细描述一下

Thanks,
Biao /'bɪ.aʊ/



On Fri, 1 Nov 2019 at 11:21, misaki L  wrote:

> 使用 window 聚合一下批量写呢?
>
> wangl...@geekplus.com.cn  于2019年11月1日周五
> 上午10:17写道:
>
> > Hi Congxian,
> >
> > 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。
> > 我们的 case 是写到 MySQL 中
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> > Sender: Congxian Qiu
> > Send Time: 2019-11-01 10:10
> > Receiver: user-zh
> > Subject: Re: 怎样把 state 定时写到外部存储
> > 好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢?
> >
> > Best,
> > Congxian
> >
> >
> > Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道:
> >
> > > 是否可以注册一个定时器?
> > >
> > >
> > > 你看看这个文章,是否对你有帮助
> > >
> > >
> > > https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
> > >  在2019年10月31日 10:16,wangl...@geekplus.com.cn<
> > wangl...@geekplus.com.cn
> > > 写道:
> > >
> > >
> > > 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
> > > 有没有什么方式可以定期读 state 写到外部存储?
> > > 我现在用的是 Flink1.7.2 版本。
> > >
> > >
> > >
> > >
> > >
> > > wangl...@geekplus.com.cn
> >
>


Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-12 Thread Biao Liu
Hi Stehpan,

+1 for having Stateful Functions in Flink.

Before discussing which repository it should belong, I was wondering if we
have reached an agreement of "splitting flink repository" as Piotr
mentioned or not. It seems that it's just no more further discussion.
It's OK for me to add it to core repository. After all almost everything is
in core repository now. But if we decide to split the core repository
someday, I tend to create a separate repository for Stateful Functions. It
might be good time to take the first step of splitting.

Thanks,
Biao /'bɪ.aʊ/



On Sat, 12 Oct 2019 at 19:31, Yu Li  wrote:

> Hi Stephan,
>
> Big +1 for adding stateful functions to Flink. I believe a lot of user
> would be interested to try this out and I could imagine how this could
> contribute to reduce the TCO for business requiring both streaming
> processing and stateful functions.
>
> And my 2 cents is to put it into flink core repository since I could see a
> tight connection between this library and flink state.
>
> Best Regards,
> Yu
>
>
> On Sat, 12 Oct 2019 at 17:31, jincheng sun 
> wrote:
>
>> Hi Stephan,
>>
>> bit +1 for adding this great features to Apache Flink.
>>
>> Regarding where we should place it, put it into Flink core repository or
>> create a separate repository? I prefer put it into main repository and
>> looking forward the more detail discussion for this decision.
>>
>> Best,
>> Jincheng
>>
>>
>> Jingsong Li  于2019年10月12日周六 上午11:32写道:
>>
>>> Hi Stephan,
>>>
>>> big +1 for this contribution. It provides another user interface that is
>>> easy to use and popular at this time. these functions, It's hard for users
>>> to write in SQL/TableApi, while using DataStream is too complex. (We've
>>> done some stateFun kind jobs using DataStream before). With statefun, it is
>>> very easy.
>>>
>>> I think it's also a good opportunity to exercise Flink's core
>>> capabilities. I looked at stateful-functions-flink briefly, it is very
>>> interesting. I think there are many other things Flink can improve. So I
>>> think it's a better thing to put it into Flink, and the improvement for it
>>> will be more natural in the future.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz 
>>> wrote:
>>>
 Hi Stephan,

 I think this is a nice library, but what I like more about it is that
 it suggests exploring different use-cases. I think it definitely makes
 sense for the Flink community to explore more lightweight applications that
 reuses resources. Therefore I definitely think it is a good idea for Flink
 community to accept this contribution and help maintaining it.

 Personally I'd prefer to have it in a separate repository. There were a
 few discussions before where different people were suggesting to extract
 connectors and other libraries to separate repositories. Moreover I think
 it could serve as an example for the Flink ecosystem website[1]. This could
 be the first project in there and give a good impression that the community
 sees potential in the ecosystem website.

 Lastly, I'm wondering if this should go through PMC vote according to
 our bylaws[2]. In the end the suggestion is to adopt an existing code base
 as is. It also proposes a new programs concept that could result in a shift
 of priorities for the community in a long run.

 Best,

 Dawid

 [1]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html

 [2] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
 On 11/10/2019 13:12, Till Rohrmann wrote:

 Hi Stephan,

 +1 for adding stateful functions to Flink. I believe the new set of
 applications this feature will unlock will be super interesting for new and
 existing Flink users alike.

 One reason for not including it in the main repository would to not
 being bound to Flink's release cadence. This would allow to release faster
 and more often. However, I believe that having it eventually in Flink's
 main repository would be beneficial in the long run.

 Cheers,
 Till

 On Fri, Oct 11, 2019 at 12:56 PM Trevor Grant 
 wrote:

> +1 non-binding on contribution.
>
> Separate repo, or feature branch to start maybe? I just feel like in
> the beginning this thing is going to have lots of breaking changes that
> maybe aren't going to fit well with tests / other "v1+" release code. Just
> my .02.
>
>
>
> On Fri, Oct 11, 2019 at 4:38 AM Stephan Ewen  wrote:
>
>> Dear Flink Community!
>>
>> Some of you probably heard it already: On Tuesday, at Flink Forward
>> Berlin, we announced **Stateful Functions**.
>>
>> Stateful Functions is a library on Flink to implement general purpose
>> applications. It is built around stateful functions (who 

[SURVEY] How do you use ExternallyInducedSource or WithMasterCheckpointHook

2019-10-09 Thread Biao Liu
Hi everyone,

I would like to reach out to the user who uses or is interested in
`ExternallyInducedSource` or `WithMasterCheckpointHook` interfaces.

The background of this survey is I'm doing some reworking of
`CheckpointCoordinator`. I encountered a problem that the semantics of
`MasterTriggerRestoreHook#triggerCheckpoint` [1] might be a bit confusing.
It looks like an asynchronous invocation (value returned is a future). But
based on the description of java doc, the implementation could be
synchronous (maybe blocking) or asynchronous. It's OK for now. However it
makes the optimization more complicated, to take care of synchronous and
asynchronous invocation at the same time [2].

I want to make the semantics clearer. Here are some options.
1. Keeps this method as the current. But emphasize in java doc and release
note that it should be a non-blocking operation, any heavy IO operation
should be executed asynchronously through the given executor. Otherwise
there might be a performance issue. In this way, it keeps the compatibility.
2. Changes the signature of this method. There will be no executor and
completable future in this method. It could be blocking for a while. We
will execute it in an executor outside. This also makes things easier,
however it breaks the compatibility.

At this moment, personally I intend to choose option 1.
If you depends on these interfaces, please let me know your opinion. Any
feedback is welcome!

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
[2] https://issues.apache.org/jira/browse/FLINK-14344

Thanks,
Biao /'bɪ.aʊ/


Re: map can't return null

2019-09-29 Thread Biao Liu
Hi allan,

It's not a bug. Flink does not support null value, see discussion [1].

In you example, you have to return something with MapFunction even there is
nothing to return. Maybe you could use flatmap instead of map to handle
this null value scenario. It's allowed to collect nothing (skip collecting
when there is no data to return) with FlatMapFunction. Does it satisfy your
requirement?

1.
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Connectors-and-NULL-handling-td29695.html#a29942

Thanks,
Biao /'bɪ.aʊ/



On Sun, 29 Sep 2019 at 16:48, Abhishek Jain  wrote:

> Hi Allan,
> Map does support null but tuple serializer does not. You might want to use
> pojo or row types if you need to deal with null values. Read more here
> 
> .
>
> - Abhishek
>
> On Sun, 29 Sep 2019 at 14:01, allan <18612537...@163.com> wrote:
>
>> Hi guys,
>>
>> When I  use  like the code,
>>
>> .map(*new *MapFunction>() {
>> @Override
>> *public *Tuple2 map(String value) *throws *Exception {
>> *if *(properties != *null*) {
>>
>> *return new *Tuple2<>(cv_no, json.toJSONString());
>>
>> }
>>
>>
>> *return null*;
>>
>> }
>> })
>>
>> next,
>>
>> .filter(f->f!=*null*)
>>
>>
>>
>> I submit my job ,  then the job throws an exception as follows.
>>
>> java.lang.NullPointerException
>>
>>at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>>
>>at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>
>>at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>>
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>>
>>at
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>>
>>at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>
>>at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>
>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>>at java.lang.Thread.run(Thread.java:748)
>>
>> .
>>
>>
>>
>> I found this method , record is null so the job threw an exception.why map 
>> can’t return null ? is this a bug?
>>
>> *protected * *void *pushToOperator(StreamRecord record) {
>>   *try *{
>>
>>
>> *// we know that the given outputTag matches our OutputTag so the record 
>> // must be of the type that our operator (and Serializer) expects.   
>>   *@SuppressWarnings(*"unchecked"*)
>>  StreamRecord castRecord = (StreamRecord) record;
>>
>>  *numRecordsIn*.inc();
>>  StreamRecord copy = 
>> castRecord.copy(*serializer*.copy(castRecord.getValue()));
>>  *operator*.setKeyContextElement1(copy);
>>  *operator*.processElement(copy);
>>   } *catch *(ClassCastException e) {
>>  *if *(*outputTag *!= *null*) {
>>
>> *// Enrich error message*ClassCastException replace = *new 
>> *ClassCastException(
>>String.*format*(
>>   *"%s. Failed to push OutputTag with id '%s' to operator. " 
>> *+
>>  *"This can occur when multiple OutputTags with 
>> different types " *+
>>  *"but identical names are being used."*,
>>   e.getMessage(),
>>   *outputTag*.getId()));
>>
>> *throw new 

Re: Could not forward element to next operator

2019-09-29 Thread Biao Liu
问题可能出在被省略的部分,检查一下最底层的 caused by

Thanks,
Biao /'bɪ.aʊ/



On Sun, 29 Sep 2019 at 13:17, <18612537...@163.com> wrote:

> 我看过这个我不是这个问题,作业没有设置水印,作业可以正常运行,最近可能是运行一天多会报这个异常
>
> 发自我的 iPhone
>
> > 在 2019年9月29日,上午11:49,Wesley Peng  写道:
> >
> > Hello,
> >
> > May this article match your issue?
> > https://blog.csdn.net/qq_41910230/article/details/90411237
> >
> > regards.
> >
> >> On Sun, Sep 29, 2019 at 10:33 AM allan <18612537...@163.com> wrote:
> >>
> >> Hi,
> >>
> >> 最近发现作业一直在报错,我的窗口是一分钟的窗口。这是什么原因,谁能帮助一下?flink版本1.6 ,错误如下:
> >>
> >>
> >>
> >>
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> >> Could not forward element to next operator}
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> >>
> >>   at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >>
> >>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>
> >>   at
> >>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >>
> >>   at
> >>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>
> >>   at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >>
> >>   at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >>
> >>   at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by:
> >>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> >> Could not forward element to next operator
> >>
> >>   at org.apache.flink.streaming.
> >>
> >>
> >>
> runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> >>
> >>   at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>
>
>


Re: task-manager中taskslot的classloader隔离问题

2019-09-29 Thread Biao Liu
同一个 TM 中,相同 job 的 subtask 会共享一个 classloader

Thanks,
Biao /'bɪ.aʊ/



On Sat, 28 Sep 2019 at 09:30, Ever <439674...@qq.com> wrote:

> 有一个job有2个task,每个task分别有3个subtask(并行度为3), 如下图所示。
>
> 每个subtask会占用一个taskslot, 但是同一个job的不同task的subtask可以共享同一个taskslot,
> 所以这里应该是一个taskslot会有2个subtask。
> 那么这两个share同一个taskslot的subtask, 其classloader是同一个,
> 还是说每个subtask都有不同的classloader呢?
>
> 因为我的job中会用到一个静态类(Scala的Object或者java中的单例类),
> 类里面有个包含基础数据的集合成员变量。我想知道这个变量是需要在每个subtask中初始化, 还是只需要在jvm范围内初始化一次。
>
>


Re: Recommended approach to debug this

2019-09-24 Thread Biao Liu
Hi Zili,

Great to hear that!
Hope to see the new client soon!

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 19:23, Zili Chen  wrote:

> Actually there is an ongoing client API refactoring on this stuff[1] and
> one of the main purpose is
> eliminating hijacking env.execute...
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>
>
> Biao Liu  于2019年9月24日周二 下午7:12写道:
>
>> So I believe (I did't test it) the solution for this case is keeping the
>> original exception thrown from `env.execute()` and throwing this exception
>> out of main method.
>> It's a bit tricky, maybe we could have a better design of this scenario.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 18:55, Biao Liu  wrote:
>>
>>> The key point of this case is in `PackagedProgram#callMainMethod`.
>>> The `ProgramAbortException` is expected when executing the main method
>>> here. This `ProgramAbortException` thrown is wrapped with
>>> `InvocationTargetException` by Java reflection layer [1]. There is a piece
>>> of codes handling `InvocationTargetException`.
>>>
>>> try {
>>>   mainMethod.invoke(null, (Object) args);
>>> }
>>> catch (...
>>> catch (InvocationTargetException e) {
>>>   Throwable exceptionInMethod = e.getTargetException();
>>>   if (exceptionInMethod instanceof Error) {
>>> throw (Error) exceptionInMethod;*-->* 
>>> *`ProgramAbortException`
>>> would be caught expectedly here.*
>>>   } else if (exceptionInMethod instanceof
>>> ProgramParametrizationException) {
>>> throw (ProgramParametrizationException) exceptionInMethod;
>>>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
>>> throw (ProgramInvocationException) exceptionInMethod;
>>>   } else { *--> If I'm right, the wrapped exception (Boxed
>>> Error or something else) change the exception, it is caught here*
>>> throw new ProgramInvocationException("The main method caused an
>>> error: " + exceptionInMethod.getMessage(), exceptionInMethod);
>>>   }
>>>
>>> The `ProgramInvocationException` is handled specially in
>>> `OptimizerPlanEnvironment`.
>>>
>>> try {
>>>   prog.invokeInteractiveModeForExecution();
>>> }
>>> catch (ProgramInvocationException e) {
>>>   throw e;   *--> The submission is failed here in this case*
>>> }
>>> catch (Throwable t) {
>>>   // the invocation gets aborted with the preview plan
>>>   if (optimizerPlan != null) {
>>> return optimizerPlan;*--> Normally it
>>> should be here*
>>>   } else {
>>> throw new ProgramInvocationException("The program caused an error:
>>> ", t);
>>>   } ...
>>>
>>> 1.
>>> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh 
>>> wrote:
>>>
>>>> Well, I think I got the solution though I am not yet sure of the
>>>> problem .. The original code looked like this ..
>>>>
>>>> Try {
>>>>   // from a parent class called Runner which runs a streamlet
>>>>   // run returns an abstraction which completes a Promise depending on
>>>> whether
>>>>   // the Job was successful or not
>>>>   val streamletExecution =
>>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>>>
>>>>   // the runner waits for the execution to complete
>>>>   // In normal circumstances it will run forever for streaming data
>>>> source unless
>>>>   // being stopped forcibly or any of the queries faces an exception
>>>>   Await.result(streamletExecution.completed, Duration.Inf)
>>>> } match { //..
>>>>
>>>> and then the streamlet.run(..) in turn finally invoked the following ..
>>>>
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>
>>>> // creates datastreams and read from / writes to Kafka
>>>> // I pasted the body of this earlier in the thread
>>>> buildExecutionGraph()
>>>>
>>>> env.execute(..)
>>>>

Re: Have trouble on running flink

2019-09-24 Thread Biao Liu
Hi Russell,

I don't think `BackendBuildingException` is root cause. In your case, this
exception appears when task is under cancelling.

Have you ever checked the log of yarn node manager? There should be an exit
code of container. Even more the container is probably killed by yarn node
manager.

BTW, I think we should discuss this in flink-user mailing list, not dev
mailing list. Will forward this mail there.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 19:19, Russell Bie  wrote:

> Hi Flink team,
>
> I am trying to submit flink job (version 1.8.2) with RocksDB backend to my
> own yarn cluster (hadoop version 2.6.0-cdh5.7.3), the job always failed
> after running for a few hours with the connection loss of some
> taskmanagers. Here<
> https://stackoverflow.com/questions/58046847/ioexception-when-taskmanager-restored-from-rocksdb-state-in-hdfs>
> is the question details on the stackoverflow. I am just wondering if you
> could provide some advice on this issue?
>
> Thanks,
> Russell
>
>


Re: Recommended approach to debug this

2019-09-24 Thread Biao Liu
So I believe (I did't test it) the solution for this case is keeping the
original exception thrown from `env.execute()` and throwing this exception
out of main method.
It's a bit tricky, maybe we could have a better design of this scenario.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 18:55, Biao Liu  wrote:

> The key point of this case is in `PackagedProgram#callMainMethod`.
> The `ProgramAbortException` is expected when executing the main method
> here. This `ProgramAbortException` thrown is wrapped with
> `InvocationTargetException` by Java reflection layer [1]. There is a piece
> of codes handling `InvocationTargetException`.
>
> try {
>   mainMethod.invoke(null, (Object) args);
> }
> catch (...
> catch (InvocationTargetException e) {
>   Throwable exceptionInMethod = e.getTargetException();
>   if (exceptionInMethod instanceof Error) {
> throw (Error) exceptionInMethod;*-->* *`ProgramAbortException`
> would be caught expectedly here.*
>   } else if (exceptionInMethod instanceof ProgramParametrizationException)
> {
> throw (ProgramParametrizationException) exceptionInMethod;
>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
> throw (ProgramInvocationException) exceptionInMethod;
>   } else { *--> If I'm right, the wrapped exception (Boxed Error
> or something else) change the exception, it is caught here*
> throw new ProgramInvocationException("The main method caused an error:
> " + exceptionInMethod.getMessage(), exceptionInMethod);
>   }
>
> The `ProgramInvocationException` is handled specially in
> `OptimizerPlanEnvironment`.
>
> try {
>   prog.invokeInteractiveModeForExecution();
> }
> catch (ProgramInvocationException e) {
>   throw e;   *--> The submission is failed here in this case*
> }
> catch (Throwable t) {
>   // the invocation gets aborted with the preview plan
>   if (optimizerPlan != null) {
> return optimizerPlan;*--> Normally it should
> be here*
>   } else {
> throw new ProgramInvocationException("The program caused an error: ",
> t);
>   } ...
>
> 1.
> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh 
> wrote:
>
>> Well, I think I got the solution though I am not yet sure of the problem
>> .. The original code looked like this ..
>>
>> Try {
>>   // from a parent class called Runner which runs a streamlet
>>   // run returns an abstraction which completes a Promise depending on
>> whether
>>   // the Job was successful or not
>>   val streamletExecution =
>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>
>>   // the runner waits for the execution to complete
>>   // In normal circumstances it will run forever for streaming data
>> source unless
>>   // being stopped forcibly or any of the queries faces an exception
>>   Await.result(streamletExecution.completed, Duration.Inf)
>> } match { //..
>>
>> and then the streamlet.run(..) in turn finally invoked the following ..
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> // creates datastreams and read from / writes to Kafka
>> // I pasted the body of this earlier in the thread
>> buildExecutionGraph()
>>
>> env.execute(..)
>>
>> This DID NOT run and failed with the exception I reported earlier. But
>> when I change the code to get the run statement out of the Try block,
>> things run fine .. like this ..
>>
>> // from a parent class called Runner which runs a streamlet
>> // run returns an abstraction which completes a Promise depending on
>> whether
>> // the Job was successful or not
>> val streamletExecution =
>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>
>> Try {
>>   // the runner waits for the execution to complete
>>   // In normal circumstances it will run forever for streaming data
>> source unless
>>   // being stopped forcibly or any of the queries faces an exception
>>   Await.result(streamletExecution.completed, Duration.Inf)
>> } match { //..
>>
>> Apparently it looks like the exception that I was facing earlier leaked
>> through the Flink engine and Try caught it and it got logged. But removing
>> it out of Try now enables Flink to catch it back and follow the course that
>> it should. But I am not sure if this is a cogent explanation and looking
>> forward to some more accurate one from the experts. Note there is no
>> asynchro

Re: Recommended approach to debug this

2019-09-24 Thread Biao Liu
The key point of this case is in `PackagedProgram#callMainMethod`.
The `ProgramAbortException` is expected when executing the main method
here. This `ProgramAbortException` thrown is wrapped with
`InvocationTargetException` by Java reflection layer [1]. There is a piece
of codes handling `InvocationTargetException`.

try {
  mainMethod.invoke(null, (Object) args);
}
catch (...
catch (InvocationTargetException e) {
  Throwable exceptionInMethod = e.getTargetException();
  if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;*-->* *`ProgramAbortException`
would be caught expectedly here.*
  } else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
  } else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
  } else { *--> If I'm right, the wrapped exception (Boxed Error or
something else) change the exception, it is caught here*
throw new ProgramInvocationException("The main method caused an error:
" + exceptionInMethod.getMessage(), exceptionInMethod);
  }

The `ProgramInvocationException` is handled specially in
`OptimizerPlanEnvironment`.

try {
  prog.invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
  throw e;   *--> The submission is failed here in this case*
}
catch (Throwable t) {
  // the invocation gets aborted with the preview plan
  if (optimizerPlan != null) {
return optimizerPlan;*--> Normally it should be
here*
  } else {
throw new ProgramInvocationException("The program caused an error: ",
t);
  } ...

1.
https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh 
wrote:

> Well, I think I got the solution though I am not yet sure of the problem
> .. The original code looked like this ..
>
> Try {
>   // from a parent class called Runner which runs a streamlet
>   // run returns an abstraction which completes a Promise depending on
> whether
>   // the Job was successful or not
>   val streamletExecution =
> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>
>   // the runner waits for the execution to complete
>   // In normal circumstances it will run forever for streaming data source
> unless
>   // being stopped forcibly or any of the queries faces an exception
>   Await.result(streamletExecution.completed, Duration.Inf)
> } match { //..
>
> and then the streamlet.run(..) in turn finally invoked the following ..
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> // creates datastreams and read from / writes to Kafka
> // I pasted the body of this earlier in the thread
> buildExecutionGraph()
>
> env.execute(..)
>
> This DID NOT run and failed with the exception I reported earlier. But
> when I change the code to get the run statement out of the Try block,
> things run fine .. like this ..
>
> // from a parent class called Runner which runs a streamlet
> // run returns an abstraction which completes a Promise depending on
> whether
> // the Job was successful or not
> val streamletExecution =
> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>
> Try {
>   // the runner waits for the execution to complete
>   // In normal circumstances it will run forever for streaming data source
> unless
>   // being stopped forcibly or any of the queries faces an exception
>   Await.result(streamletExecution.completed, Duration.Inf)
> } match { //..
>
> Apparently it looks like the exception that I was facing earlier leaked
> through the Flink engine and Try caught it and it got logged. But removing
> it out of Try now enables Flink to catch it back and follow the course that
> it should. But I am not sure if this is a cogent explanation and looking
> forward to some more accurate one from the experts. Note there is no
> asynchrony of concurrency going on here - the Runner code may look a bit
> over-engineered but there is a context to this. The Runner code handles not
> only Flink but other types of streaming engines as well like Spark and Akka
> Streams.
>
> regards.
>
>
> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu  wrote:
>
>> Hi Zili,
>>
>> Thanks for pointing that out.
>> I didn't realize that it's a REST API based case. Debasish's case has
>> been discussed not only in this thread...
>>
>> It's really hard to analyze the case without the full picture.
>>
>> I think the reason of why `ProgramAbortException` is not caught is that
>> he did something outside `env.execute`. Like executing this piece of codes
>> inside a S

Re: Recommended approach to debug this

2019-09-23 Thread Biao Liu
Hi Zili,

Thanks for pointing that out.
I didn't realize that it's a REST API based case. Debasish's case has been
discussed not only in this thread...

It's really hard to analyze the case without the full picture.

I think the reason of why `ProgramAbortException` is not caught is that he
did something outside `env.execute`. Like executing this piece of codes
inside a Scala future.

I guess the scenario is that he is submitting job through REST API. But in
the main method, he wraps `env.execute` with Scala future, not executing it
directly.
The reason of env has been set to `StreamPlanEnvironment` is
`JarHandlerUtils` retrieves job graph through it.
And the `ProgramAbortException` is not thrown out, because the Scala future
tackles this exception.
So retrieving job graph fails due to an unrecognized exception (Boxed
Error).

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 10:44, Zili Chen  wrote:

> Hi Biao,
>
> The log below already infers that the job was submitted via REST API and I
> don't think it matters.
>
> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$
> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
> getJobGraphAsync$6(JarRunHandler.java:142)
>
> What I don't understand it that flink DOES catch the exception at the
> point it is reported thrown...
>
> Best,
> tison.
>
>
> Biao Liu  于2019年9月24日周二 上午10:34写道:
>
>>
>> > We submit the code through Kubernetes Flink Operator which uses the
>> REST API to submit the job to the Job Manager
>>
>> So you are submitting job through REST API, not Flink client? Could you
>> explain more about this?
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh 
>> wrote:
>>
>>> Hi Dian -
>>>
>>> We submit one job through the operator. We just use the following to
>>> complete a promise when the job completes ..
>>>
>>>   Try {
>>> createLogic.executeStreamingQueries(ctx.env)
>>>   }.fold(
>>> th ⇒ completionPromise.tryFailure(th),
>>> _ ⇒ completionPromise.trySuccess(Dun)
>>>   )
>>>
>>> If we totally do away with the promise and future stuff then we don't
>>> get the boxed error - only the exception reported in Caused By.
>>>
>>> regards.
>>>
>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu  wrote:
>>>
>>>> Hi Debasish,
>>>>
>>>> In which case will the exception occur? Does it occur when you submit
>>>> one job at a time or when multiple jobs are submitted at the same time? I'm
>>>> asking this because I noticed that you used Future to execute the job
>>>> unblocking. I guess ThreadLocal doesn't work well in this case.
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh  写道:
>>>>
>>>> Hi tison -
>>>>
>>>> Please find my response below in >>.
>>>>
>>>> regards.
>>>>
>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen  wrote:
>>>>
>>>>> Hi Debasish,
>>>>>
>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be caught
>>>>> at OptimizerPlanEnvironment#getOptimizedPlan
>>>>> in its catch (Throwable t) branch.
>>>>>
>>>>
>>>> >> true but what I get is a StreamPlanEnvironment. From my code I am
>>>> only doing val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> .
>>>>
>>>>>
>>>>> It should always throw a ProgramInvocationException instead of
>>>>> OptimizerPlanEnvironment.ProgramAbortException if any
>>>>> exception thrown in the main method of your code.
>>>>>
>>>>> Another important problem is how the code is executed, (set context
>>>>> environment should be another flink internal operation)
>>>>> but given that you submit the job via flink k8s operator it might
>>>>> require time to take a look at k8s operator implementation.
>>>>>
>>>>
>>>> >> We submit the code through Kubernetes Flink Operator which uses the
>>>> REST API to submit the job to the Job Manager
>>>>
>>>>>
>>>>> However, given we catch Throwable in the place this exception thrown,
>>>>> I highly suspect whether it is executed by an of

Re: Flink job manager doesn't remove stale checkmarks

2019-09-23 Thread Biao Liu
Hi Clay,

Sorry I don't get your point. I'm not sure what the "stale checkmarks"
exactly means. The HA storage and checkpoint directory left after shutting
down cluster?

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 03:12, Clay Teeter  wrote:

> I'm trying to get my standalone cluster to remove stale checkmarks.
>
> The cluster is composed of a single job and task manager backed by rocksdb
> with high availability.
>
> The configuration on both the job and task manager are:
>
> state.backend: rocksdb
> state.checkpoints.dir: file:///opt/ha/49/checkpoints
> state.backend.incremental: true
> state.checkpoints.num-retained: 3
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 2048m
> taskmanager.numberOfTaskSlots: 24
> parallelism.default: 1
> high-availability.jobmanager.port: 6123
> high-availability.zookeeper.path.root: _49
> high-availability: zookeeper
> high-availability.storageDir: file:///opt/ha/49/ha
> high-availability.zookeeper.quorum: **t:2181
>
> Both machines have access to /opt/ha/49 and /opt/ha/49/checkpoints via NFS
> and are owned by the flink user.  Also, there are no errors that i can find.
>
> Does anyone have any ideas that i could try?
>
>


Re: Recommended approach to debug this

2019-09-23 Thread Biao Liu
> We submit the code through Kubernetes Flink Operator which uses the REST
API to submit the job to the Job Manager

So you are submitting job through REST API, not Flink client? Could you
explain more about this?

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh 
wrote:

> Hi Dian -
>
> We submit one job through the operator. We just use the following to
> complete a promise when the job completes ..
>
>   Try {
> createLogic.executeStreamingQueries(ctx.env)
>   }.fold(
> th ⇒ completionPromise.tryFailure(th),
> _ ⇒ completionPromise.trySuccess(Dun)
>   )
>
> If we totally do away with the promise and future stuff then we don't get
> the boxed error - only the exception reported in Caused By.
>
> regards.
>
> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu  wrote:
>
>> Hi Debasish,
>>
>> In which case will the exception occur? Does it occur when you submit one
>> job at a time or when multiple jobs are submitted at the same time? I'm
>> asking this because I noticed that you used Future to execute the job
>> unblocking. I guess ThreadLocal doesn't work well in this case.
>>
>> Regards,
>> Dian
>>
>> 在 2019年9月23日,下午11:57,Debasish Ghosh  写道:
>>
>> Hi tison -
>>
>> Please find my response below in >>.
>>
>> regards.
>>
>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen  wrote:
>>
>>> Hi Debasish,
>>>
>>> The OptimizerPlanEnvironment.ProgramAbortException should be caught at
>>> OptimizerPlanEnvironment#getOptimizedPlan
>>> in its catch (Throwable t) branch.
>>>
>>
>> >> true but what I get is a StreamPlanEnvironment. From my code I am only
>> doing val env = StreamExecutionEnvironment.getExecutionEnvironment.
>>
>>>
>>> It should always throw a ProgramInvocationException instead of
>>> OptimizerPlanEnvironment.ProgramAbortException if any
>>> exception thrown in the main method of your code.
>>>
>>> Another important problem is how the code is executed, (set context
>>> environment should be another flink internal operation)
>>> but given that you submit the job via flink k8s operator it might
>>> require time to take a look at k8s operator implementation.
>>>
>>
>> >> We submit the code through Kubernetes Flink Operator which uses the
>> REST API to submit the job to the Job Manager
>>
>>>
>>> However, given we catch Throwable in the place this exception thrown, I
>>> highly suspect whether it is executed by an official
>>> flink release.
>>>
>>
>> >> It is an official Flink release 1.9.0
>>
>>>
>>> A completed version of the code and the submission process is helpful.
>>> Besides, what is buildExecutionGraph return type,
>>> I think it is not ExecutionGraph in flink...
>>>
>>
>> >> buildExecutionGraph is our function which returns a Unit. It's not
>> ExecutionGraph. It builds the DataStream s by reading from Kafka and then
>> finally writes to Kafka.
>>
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Debasish Ghosh  于2019年9月23日周一 下午8:21写道:
>>>
 This is the complete stack trace which we get from execution on
 Kubernetes using the Flink Kubernetes operator .. The boxed error comes
 from the fact that we complete a Promise with Success when it returns a
 JobExecutionResult and with Failure when we get an exception. And here we r
 getting an exception. So the real stack trace we have is the one below in
 Caused By.

 java.util.concurrent.ExecutionException: Boxed Error
 at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
 at
 scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
 at scala.concurrent.Promise.tryFailure(Promise.scala:112)
 at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
 at
 pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
 at
 pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
 at scala.util.Failure.fold(Try.scala:240)
 at
 pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
 at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
 at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
 at scala.util.Try$.apply(Try.scala:213)
 at pipelines.runner.Runner$.run(Runner.scala:43)
 at pipelines.runner.Runner$.main(Runner.scala:30)
 at pipelines.runner.Runner.main(Runner.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at
 

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Wow, that's really cool! There are indeed a lot works you have done. IMO
it's beyond the scope of user group somewhat.

Just one small concern, I'm not sure I have fully understood your way of
"tackle data skew by altering the way Flink partition keys using
KeyedStream".

>From my understanding, key-group is used for rescaling job. Like supporting
reusing state after changing the parallelism of operator.
I'm not sure whether you are in the right direction or not. It seems that
you are implementing something deeper than user interface. User interface
is stable, while implementation is not. Usually it's not recommended to
support a feature based on implementation.

If you have strong reasons to change the implementation, I would suggest to
start a discussion in dev mailing list. Maybe it could be supported
officially. What do you think?

Thanks,
Biao /'bɪ.aʊ/



On Mon, 23 Sep 2019 at 20:54, Felipe Gutierrez 
wrote:

>
> I`ve implemented a combiner [1] in Flink by extending
> OneInputStreamOperator in Flink. I call my operator using "transform".
> It works well and I guess it is useful if I import this operator in the
> DataStream.java. I just need more to check if I need to touch other parts
> of the source code.
>
> But now I want to tackle data skew by altering the way Flink partition
> keys using KeyedStream.
>
> [1]
> https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Sep 23, 2019 at 2:37 PM Biao Liu  wrote:
>
>> Hi Felipe,
>>
>> If I understand correctly, you want to solve data skew caused by
>> imbalanced key?
>>
>> There is a common strategy to solve this kind of problem,
>> pre-aggregation. Like combiner of MapReduce.
>> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
>> afraid you have to implement it by yourself.
>>
>> For example, introducing a function caching some data (time or count
>> based). This function should be before "keyby". And it's on a non-keyed
>> stream. It does pre-aggregation just like what the aggregation after
>> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
>>
>> I also found a suggestion [1] from Fabian, although it's long time ago.
>>
>> Hope it helps.
>>
>> 1.
>> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> thanks Biao,
>>>
>>> I see. To achieve what I want to do I need to work with KeyedStream. I
>>> downloaded the Flink source code to learn and alter the KeyedStream to my
>>> needs. I am not sure but it is a lot of work because as far as I understood
>>> the key-groups have to be predictable [1]. and altering this touches a lot
>>> of other parts of the source code.
>>>
>>> However, If I guarantee that they (key-groups) are predictable, I will
>>> be able to rebalance, rescale,  the keys to other worker-nodes.
>>>
>>> [1]
>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>>
>>> Thanks,
>>> Felipe
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu  wrote:
>>>
>>>> Hi Felipe,
>>>>
>>>> Flink job graph is DAG based. It seems that you set an "edge property"
>>>> (partitioner) several times.
>>>> Flink does not support multiple partitioners on one edge. The later one
>>>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>>>> "partitionByPartial".
>>>>
>>>> You could insert some nodes between these partitioners to satisfy your
>>>> requirement. For example,
>>>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>>>
>>>> Thanks,
>>>> Biao /'bɪ.aʊ/
>>>>
>>>>
>>>>
>>>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>
>

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Hi Felipe,

If I understand correctly, you want to solve data skew caused by imbalanced
key?

There is a common strategy to solve this kind of problem, pre-aggregation.
Like combiner of MapReduce.
But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
afraid you have to implement it by yourself.

For example, introducing a function caching some data (time or count
based). This function should be before "keyby". And it's on a non-keyed
stream. It does pre-aggregation just like what the aggregation after
"keyby" does. In this way, the skewed keyed data would be reduced a lot.

I also found a suggestion [1] from Fabian, although it's long time ago.

Hope it helps.

1.
https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation

Thanks,
Biao /'bɪ.aʊ/



On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez 
wrote:

> thanks Biao,
>
> I see. To achieve what I want to do I need to work with KeyedStream. I
> downloaded the Flink source code to learn and alter the KeyedStream to my
> needs. I am not sure but it is a lot of work because as far as I understood
> the key-groups have to be predictable [1]. and altering this touches a lot
> of other parts of the source code.
>
> However, If I guarantee that they (key-groups) are predictable, I will be
> able to rebalance, rescale,  the keys to other worker-nodes.
>
> [1]
> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Mon, Sep 23, 2019 at 9:51 AM Biao Liu  wrote:
>
>> Hi Felipe,
>>
>> Flink job graph is DAG based. It seems that you set an "edge property"
>> (partitioner) several times.
>> Flink does not support multiple partitioners on one edge. The later one
>> overrides the priors. That means the "keyBy" overrides the "rebalance" and
>> "partitionByPartial".
>>
>> You could insert some nodes between these partitioners to satisfy your
>> requirement. For example,
>> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> I am executing a data stream application which uses rebalance. Basically
>>> I am counting words using "src -> split -> physicalPartitionStrategy ->
>>> keyBy -> sum -> print". I am running 3 examples, one without physical
>>> partition strategy, one with rebalance strategy [1], and one with
>>> partial partition strategy from [2].
>>> I know that the keyBy operator actually kills what rebalance is doing
>>> because it splits the stream by key and if I have a stream with skewed key,
>>> one parallel instance of the operator after the keyBy will be overloaded.
>>> However, I was expecting that *before the keyBy* I would have a
>>> balanced stream, which is not happening.
>>>
>>> Basically, I want to see the difference in records/sec between operators
>>> when I use rebalance or any other physical partition strategy. However,
>>> when I found no difference in the records/sec metrics of all operators when
>>> I am running 3 different physical partition strategies. Screenshots of
>>> Prometheus+Grafana are attached.
>>>
>>> Maybe I am measuring the wrong operator, or maybe I am not using the
>>> rebalance in the right way, or I am not doing a good use case to test the
>>> rebalance transformation.
>>> I am also testing a different physical partition to later try to
>>> implement the issue "FLINK-1725 New Partitioner for better load balancing
>>> for skewed data" [2]. I am not sure, but I guess that all physical
>>> partition strategies have to be implemented on a KeyedStream.
>>>
>>> DataStream text = env.addSource(new WordSource());
>>> // split lines in strings
>>> DataStream> tokenizer = text.flatMap(new
>>> Tokenizer());
>>> // choose a partitioning strategy
>>> DataStream> partitionedStream = tokenizer);
>>> DataStream> partitionedStream =
>>> tokenizer.rebalance();
>>> DataStream> partitionedStream =
>>> tokenizer.partitionByPartial(0);
>>> // count
>>> partitionedStream.keyBy(0).sum(1).print();
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
>>> [2] https://issues.apache.org/jira/browse/FLINK-1725
>>>
>>> thanks,
>>> Felipe
>>>
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>


Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Hi Felipe,

Flink job graph is DAG based. It seems that you set an "edge property"
(partitioner) several times.
Flink does not support multiple partitioners on one edge. The later one
overrides the priors. That means the "keyBy" overrides the "rebalance" and
"partitionByPartial".

You could insert some nodes between these partitioners to satisfy your
requirement. For example,
`sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez 
wrote:

> I am executing a data stream application which uses rebalance. Basically I
> am counting words using "src -> split -> physicalPartitionStrategy -> keyBy
> -> sum -> print". I am running 3 examples, one without physical
> partition strategy, one with rebalance strategy [1], and one with
> partial partition strategy from [2].
> I know that the keyBy operator actually kills what rebalance is doing
> because it splits the stream by key and if I have a stream with skewed key,
> one parallel instance of the operator after the keyBy will be overloaded.
> However, I was expecting that *before the keyBy* I would have a balanced
> stream, which is not happening.
>
> Basically, I want to see the difference in records/sec between operators
> when I use rebalance or any other physical partition strategy. However,
> when I found no difference in the records/sec metrics of all operators when
> I am running 3 different physical partition strategies. Screenshots of
> Prometheus+Grafana are attached.
>
> Maybe I am measuring the wrong operator, or maybe I am not using the
> rebalance in the right way, or I am not doing a good use case to test the
> rebalance transformation.
> I am also testing a different physical partition to later try to implement
> the issue "FLINK-1725 New Partitioner for better load balancing for skewed
> data" [2]. I am not sure, but I guess that all physical partition
> strategies have to be implemented on a KeyedStream.
>
> DataStream text = env.addSource(new WordSource());
> // split lines in strings
> DataStream> tokenizer = text.flatMap(new
> Tokenizer());
> // choose a partitioning strategy
> DataStream> partitionedStream = tokenizer);
> DataStream> partitionedStream =
> tokenizer.rebalance();
> DataStream> partitionedStream =
> tokenizer.partitionByPartial(0);
> // count
> partitionedStream.keyBy(0).sum(1).print();
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning
> [2] https://issues.apache.org/jira/browse/FLINK-1725
>
> thanks,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: 关于Async I/O的exactly-once

2019-09-23 Thread Biao Liu
1. 首先你描述的场景,不只存在于 Async IO operator,其他 operator 也会有类似问题
2. Flink 的 exactly once 是针对 Flink 内部而言,例如 state 等,[1]
3. 如果你想针对外部系统也保证 exactly once 语义,需要对应的 connector 支持 [2]

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/guarantees.html

Thanks,
Biao /'bɪ.aʊ/



On Wed, 4 Sep 2019 at 12:58, star <3149768...@qq.com> wrote:

> 感谢您的回复,异步io只能针对失败的异步请求 重发,而不能保证每个请求只发一次,那应该是at least once?附文档:The
> asynchronous I/O operator offers full exactly-once fault tolerance
> guarantees. It stores the records for in-flight asynchronous requests in
> checkpoints and restores/re-triggers the requests when recovering from a
> failure.
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: Dino Zhang  发送时间: 2019年9月4日 09:20
> 收件人: user-zh  主题: 回复:关于Async I/O的exactly-once
>
>
>
> hi star,
>
> exactly-once指flink内部的,要保证end-to-end
> exactly可以通过两阶段提交,需要实现TwoPhaseCommitSinkFunction,或者做幂等处理
>
> On Wed, Sep 4, 2019 at 8:20 AM star <3149768...@qq.com wrote:
>
> 
> 看文档我的理解是会将异步的请求保存在检查点中,failover的时候重新触发请求。我的问题是既然是重新触发请求,并没有回滚,那之前的请求已经对外部系统造成影响了,不就是at
>  least-once了吗?
>  比如ck1:发送了a b c三个请求更新外部数据库,ck2:发送:d,e,f。假设ck1做完了checkpoint,a
> ,b请求成功,c没成功.
> 
> 
> 
> ck2在执行到e的时候任务被cancel了,但c,d都已经成功了。那么我重新启动的时候从最近一次成功的ck1拉起,c,d岂不是又要被重新请求一次
> 
> 
>  谢谢
> 
>  发自我的iPhone
>
>
>
> --
> Regards,
> DinoZhang


Re: Running flink examples

2019-09-20 Thread Biao Liu
Hi RAMALINGESWARA,

Are you sure it's reading your input date correctly? Asking this because I
saw the default input date (which is applied if there is no input data
offered) is just 15 elements.

Actually the default number of iterations is 10. You could pass a parameter
"--iterations $the_number_you_wanted" to change the default behavior.

I'm not sure whether it is what you exactly want or not.
You could find the source code of this example here [1]. Maybe it could
help.

1.
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java

Thanks,
Biao /'bɪ.aʊ/



On Fri, 20 Sep 2019 at 01:02, RAMALINGESWARA RAO THOTTEMPUDI <
tr...@iitkgp.ac.in> wrote:

> Respected Sir,
>
> It is reading the data but giving only 15 results. for example for
> pagerank only it is giving 15 elements node ranking. But I  need each node
> ranking for 2 nodes.
>
> ------
> *From: *"Biao Liu" 
> *To: *"Vijay Bhaskar" 
> *Cc: *"RAMALINGESWARA RAO THOTTEMPUDI" , "user" <
> user@flink.apache.org>
> *Sent: *Thursday, September 19, 2019 1:06:53 PM
> *Subject: *Re: Running flink examples
>
> Hi,
> I guess the specific input (--input /path/to/input) didn't work.
> I just checked the PageRank example program, it accepts "--pages" and
> "--links" as input parameters.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, 19 Sep 2019 at 14:56, Vijay Bhaskar 
> wrote:
>
>> Can you check whether its able to read the supplied input file properly
>> or not?
>> Regards
>> Bhaskar
>>
>> On Wed, Sep 18, 2019 at 1:07 PM RAMALINGESWARA RAO THOTTEMPUDI <
>> tr...@iitkgp.ac.in> wrote:
>>
>>>  Hi Sir,
>>>
>>> I am trying to run the flink programs particularl Pagerank.
>>>
>>> I have used the following command :
>>>
>>> ./bin/flink run -d ./examples/batch/PageRank.jar --input /path/to/input
>>>
>>> It is running but it is showing only 15 elements ranking for my data.
>>> But I need to find the ranking of all elements of my data.
>>> Because the original program  is running only for fixed number of
>>> iterations which is 15. How can I modify to run for full data elements.
>>>
>>> I have to change the value of fixed number of iterations.
>>>
>>>
>>>
>>> Thanking You,
>>>
>>> TR RAO
>>>
>>
>


Re: Client for Monitoring API!

2019-09-20 Thread Biao Liu
Ah, now I understand what exactly your requirement is.

I don't think there is such a tool in Flink which could help you to fetch
and store the content of rest api. It seems not to be a general requirement.

But I'm really interested in the motivation behind your requirement. Could
you share more about it?

Why do you want to keep the history of checkpointing at subtask level? How
do you use this history?


Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 16:12, Anis Nasir  wrote:

> Thanks Biao for your response.
>
> We would like to fetch metrics at subtask level for each checkpoint. This
> information is not exposed via default metrics, but are available in rest
> end point!
>
> Also, would like to persist the history of checkpoints. This information
> is lost whenever we restart the job (or job manager is lost).
>
>
> Cheers,
> Anis
>
>
>
> On Thu, Sep 19, 2019 at 5:02 AM Biao Liu  wrote:
>
>> Hi Anis,
>>
>> Have you tried Flink metric reporter? It's a better way to handle metrics
>> than through rest api.
>> Flink supports reporting metrics to external system. You could find the
>> list of external systems supported here [1].
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Wed, 18 Sep 2019 at 19:36, Anis Nasir  wrote:
>>
>>> Hey all,
>>>
>>> Is there any client library that we can use to fetch/store the metrics
>>> expose through flink monitoring rest api ?
>>>
>>>
>>> Regards,
>>> Anis
>>>
>>


Re: Running flink examples

2019-09-19 Thread Biao Liu
Hi,

I guess the specific input (--input /path/to/input) didn't work.
I just checked the PageRank example program, it accepts "--pages" and
"--links" as input parameters.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 14:56, Vijay Bhaskar 
wrote:

> Can you check whether its able to read the supplied input file properly or
> not?
>
> Regards
> Bhaskar
>
> On Wed, Sep 18, 2019 at 1:07 PM RAMALINGESWARA RAO THOTTEMPUDI <
> tr...@iitkgp.ac.in> wrote:
>
>>  Hi Sir,
>>
>> I am trying to run the flink programs particularl Pagerank.
>>
>> I have used the following command :
>>
>> ./bin/flink run -d ./examples/batch/PageRank.jar --input /path/to/input
>>
>> It is running but it is showing only 15 elements ranking for my data. But
>> I need to find the ranking of all elements of my data.
>> Because the original program  is running only for fixed number of
>> iterations which is 15. How can I modify to run for full data elements.
>>
>> I have to change the value of fixed number of iterations.
>>
>>
>>
>> Thanking You,
>>
>> TR RAO
>>
>


Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Biao Liu
y(Try.scala:213)
>>>> at pipelines.runner.Runner$.run(Runner.scala:43)
>>>> at pipelines.runner.Runner$.main(Runner.scala:30)
>>>> at pipelines.runner.Runner.main(Runner.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>> at
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>> at
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>> at
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.avro.Schema$Field
>>>> at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>> at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>> at java.util.ArrayList.writeObject(ArrayList.java:766)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
>>>> at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>> at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>> at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
>>>> at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
>>>>
>>>>
>>>> I also tried the following ..
>>>>
>>>> class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide,
>>>> TaxiFare, TaxiRideFare] {
>>>>
>>>> @transient var rideState: ValueState[TaxiRide] = null
>>>> @transient var fareState: ValueState[TaxiFare] = null
>>>>
>>>> override def open(params: Configuration): Unit = {
>>>>   super.open(params)
>>>>   rideState = getRuntimeContext.getState(
>>>> new ValueStateDescriptor[TaxiRide]("saved ride",
>>>> classOf[TaxiRide]))
>>>>   fareState = getRuntimeContext.getState(
>>>> new ValueStateDescriptor[TaxiFare]("saved fare",
>>>> classOf[TaxiFare]))
>>>> }
>>>>
>>>> and moved the state initialization to open function. But still get the
>>>> same result.
>>>>
>>>> Help ?
>>>>
>>>> regards.
>>>>
>>>>
>>>>
>>&

Re: Client for Monitoring API!

2019-09-18 Thread Biao Liu
Hi Anis,

Have you tried Flink metric reporter? It's a better way to handle metrics
than through rest api.
Flink supports reporting metrics to external system. You could find the
list of external systems supported here [1].

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Sep 2019 at 19:36, Anis Nasir  wrote:

> Hey all,
>
> Is there any client library that we can use to fetch/store the metrics
> expose through flink monitoring rest api ?
>
>
> Regards,
> Anis
>


Re: serialization issue in streaming job run with scala Future

2019-09-17 Thread Biao Liu
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due
to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize
all referenced objects, for example, the outer class instance. If the outer
class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh 
wrote:

> My main question is why serialisation kicks in when I try to execute
> within a `Future` and not otherwise.
>
> regards.
>
> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh 
> wrote:
>
>> Yes, they are generated from Avro Schema and implements Serializable ..
>>
>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma 
>> wrote:
>>
>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>
>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh 
>>> wrote:
>>>
 Hello -

 The following piece of code is an example of a connected data streams ..

 val rides: DataStream[TaxiRide] =
   readStream(inTaxiRide)
 .filter { ride ⇒ ride.getIsStart().booleanValue }
 .keyBy("rideId")

 val fares: DataStream[TaxiFare] =
   readStream(inTaxiFare)
 .keyBy("rideId")

 val processed: DataStream[TaxiRideFare] =
   rides
 .connect(fares)
 .flatMap(new EnrichmentFunction)

 When I execute the above logic using
 StreamExecutionEnvironment.execute(..) it runs fine.
 But if I try to execute the above from within a scala.concurrent.Future,
 I get the following exception ..

 org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
 pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
 type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
 startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
 pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
 serializable. The object probably contains or references non serializable
 fields.
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
 at
 org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
 at
 org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
 at
 org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
   ...

 Caused by: java.io.NotSerializableException:
 org.apache.avro.Schema$Field
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

 Any thoughts why this may happen ?

 regards.

 --
 Debasish Ghosh
 http://manning.com/ghosh2
 http://manning.com/ghosh

 Twttr: @debasishg
 Blog: http://debasishg.blogspot.com
 Code: http://github.com/debasishg

>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
> --
> Sent from my iPhone
>


Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread Biao Liu
Congrats Zili!

Thanks,
Biao /'bɪ.aʊ/



On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:

> Congratulations!
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:
>
>> Congratulations!
>>
>>
>> Best,
>>
>> Jiayi Liao
>>
>>  Original Message
>> *Sender:* Till Rohrmann
>> *Recipient:* dev; user
>> *Date:* Wednesday, Sep 11, 2019 17:22
>> *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
>>
>> Hi everyone,
>>
>> I'm very happy to announce that Zili Chen (some of you might also know
>> him as Tison Kun) accepted the offer of the Flink PMC to become a committer
>> of the Flink project.
>>
>> Zili Chen has been an active community member for almost 16 months now.
>> He helped pushing the Flip-6 effort over the finish line, ported a lot of
>> legacy code tests, removed a good part of the legacy code, contributed
>> numerous fixes, is involved in the Flink's client API refactoring, drives
>> the refactoring of Flink's HighAvailabilityServices and much more. Zili
>> Chen also helped the community by PR reviews, reporting Flink issues,
>> answering user mails and being very active on the dev mailing list.
>>
>> Congratulations Zili Chen!
>>
>> Best, Till
>> (on behalf of the Flink PMC)
>>
>


Re: kinesis table connector support

2019-09-02 Thread Biao Liu
Hi Fanbin,

I'm not familiar with table module. Maybe someone else could help.

@jincheng sun 
Do you know there is any plan for kinesis table connector?

Thanks,
Biao /'bɪ.aʊ/



On Sat, 24 Aug 2019 at 02:26, Fanbin Bu  wrote:

> Hi,
>
> Looks like Flink table connectors do not include `kinesis`. (only
> FileSystem, Kafka, ES) see
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#table-connectors
> .
> I also found some examples for Kafka:
> https://eventador.io/blog/flink_table_and_sql_api_with_apache_flink_16/.
> I'm wondering is there such a thing for kinesis also.
>
> Is there any plan to support this in the future? Otherwise, what needs to
> be done if we want to implement it on my own.
>
> Basically, I have a kinesis stream that emits json string data and I would
> like to use Flink Table/SQL api to to the streaming/batch processing.
> Currently, I'm using DataStream API which is not as flexible.
>
> Any help would be appreciated.
>
> Thanks,
> Fanbin
>


Re: checkpoint failure suddenly even state size is into 10 mb around

2019-09-02 Thread Biao Liu
Hi Sushant,

Your screenshot shows the checkpoint expired. It means checkpoint did not
finish in time.
I guess the reason is the heavy back pressure blocks both data and barrier.
But I can't tell why there was a heavy back pressure.

If this scenario happens again, you could pay more attention to the tasks
which cause this heavy back pressure.
The task manager log, GC log, and some other tools like jstack might help.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 23 Aug 2019 at 15:27, Sushant Sawant 
wrote:

> Hi all,
> m facing two issues which I believe are co-related though.
> 1. Kafka source shows high back pressure.
> 2. Sudden checkpoint failure for entire day until restart.
>
> My job does following thing,
> a. Read from Kafka
> b. Asyncio to external system
> c. Dumping in Cassandra, Elasticsearch
>
> Checkpointing is using file system.
> This flink job is proven under high load,
> around 5000/sec throughput.
> But recently we scaled down parallelism since, there wasn't any load in
> production and these issues started.
>
> Please find the status shown by flink dashboard.
> The github folder contains image where there was high back pressure and
> checkpoint failure
>
> https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
> and  after restart, "everything is fine" images in this folder,
>
> https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing
>
> --
> Could anyone point me towards direction what would have went wrong/
> trouble shooting??
>
>
> Thanks & Regards,
> Sushant Sawant
>


Re: Is it possible to register a custom TypeInfoFactory without using an annotation?

2019-09-02 Thread Biao Liu
Hi,

Java supports customization of serializer/deserializer, see [1]. Could it
satisfy your requirement?

1. https://stackoverflow.com/questions/7290777/java-custom-serialization

Thanks,
Biao /'bɪ.aʊ/



On Mon, 26 Aug 2019 at 16:34, 杨力  wrote:

> I'd like to provide a custom serializer for a POJO class. But that class
> cannot be modified so it's not possible to add a @TypeInfo annotation to
> it. Are there any other ways to register one?
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Biao Liu
Great news!

Thank your Gordon & Kurt for being the release managers!
Thanks all contributors worked on this release!

Thanks,
Biao /'bɪ.aʊ/



On Thu, 22 Aug 2019 at 21:14, Paul Lam  wrote:

> Well done! Thanks to everyone who contributed to the release!
>
> Best,
> Paul Lam
>
> Yu Li  于2019年8月22日周四 下午9:03写道:
>
>> Thanks for the update Gordon, and congratulations!
>>
>> Great thanks to all for making this release possible, especially to our
>> release managers!
>>
>> Best Regards,
>> Yu
>>
>>
>> On Thu, 22 Aug 2019 at 14:55, Xintong Song  wrote:
>>
>>> Congratulations!
>>> Thanks Gordon and Kurt for being the release managers, and thanks all
>>> the contributors.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Aug 22, 2019 at 2:39 PM Yun Gao  wrote:
>>>
  Congratulations !

  Very thanks for Gordon and Kurt for managing the release and very
 thanks for everyone for the contributions !

   Best,
   Yun



 --
 From:Zhu Zhu 
 Send Time:2019 Aug. 22 (Thu.) 20:18
 To:Eliza 
 Cc:user 
 Subject:Re: [ANNOUNCE] Apache Flink 1.9.0 released

 Thanks Gordon for the update.
 Congratulations that we have Flink 1.9.0 released!
 Thanks to all the contributors.

 Thanks,
 Zhu Zhu


 Eliza  于2019年8月22日周四 下午8:10写道:


 On 2019/8/22 星期四 下午 8:03, Tzu-Li (Gordon) Tai wrote:
 > The Apache Flink community is very happy to announce the release of
 > Apache Flink 1.9.0, which is the latest major release.

 Congratulations and thanks~

 regards.





Re: build Flink master brach fail due to npm

2019-08-20 Thread Biao Liu
HI,

Could you try adding "-Dskip.npm" when building the project?

This is just a work-around way to skip this installation. Though I don't
have much experience of npm. Maybe someone else could help.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 20 Aug 2019 at 16:23, 刘波 <1091643...@qq.com> wrote:

> Hi, users!
>
> When I'm build the Flink master branch  from github,  it's failed due to
> the flink-runtime-web module.
>   nodejs v10.15.2
>   npm6.10.3
>   Javaopenjdk version "11.0.4" 2019-07-16
>   maven 3.6.0
>
> anyone got the same problem? any suggestions to  resolve it... thanks~
>
>
> [INFO]
> [INFO] --- maven-remote-resources-plugin:1.5:process
> (process-resource-bundles) @ flink-runtime-web_2.11 ---
> [INFO]
> [INFO] --- frontend-maven-plugin:1.6:install-node-and-npm (install node
> and npm) @ flink-runtime-web_2.11 ---
> [INFO] Node v10.9.0 is already installed.
> [INFO]
> [INFO] --- frontend-maven-plugin:1.6:npm (npm install) @
> flink-runtime-web_2.11 ---
> [INFO] Running 'npm ci --cache-max=0 --no-save' in
> /home/ben/work/flink/flink-runtime-web/web-dashboard
> [WARNING] npm WARN prepare removing existing node_modules/ before
> installation
> [INFO]
> [INFO] > fsevents@1.2.7 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/fsevents
> [INFO] > node install
> [INFO]
> [ERROR] Aborted (core dumped)
> [INFO]
> [INFO] > node-sass@4.11.0 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/node-sass
> [INFO] > node scripts/install.js
> [INFO]
> [INFO] Cached binary found at
> /home/ben/.npm/node-sass/4.11.0/linux-x64-64_binding.node
> [ERROR] Aborted (core dumped)
> [INFO]
> [INFO] > husky@1.3.1 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/husky
> [INFO] > node husky install
> [INFO]
> [INFO] husky > setting up git hooks
> [INFO] HUSKY_SKIP_INSTALL environment variable is set to 'true', skipping
> Git hooks installation.
> [ERROR] Aborted (core dumped)
> [ERROR] npm ERR! code ELIFECYCLE
> [ERROR] npm ERR! errno 134
> [ERROR] npm ERR! husky@1.3.1 install: `node husky install`
> [ERROR] npm ERR! Exit status 134
> [ERROR] npm ERR!
> [ERROR] npm ERR! Failed at the husky@1.3.1 install script.
> [ERROR] npm ERR! This is probably not a problem with npm. There is likely
> additional logging output above.
> [ERROR]
> [ERROR] npm ERR! A complete log of this run can be found in:
> [ERROR] npm ERR!
>  /home/ben/.npm/_logs/2019-08-20T07_59_08_468Z-debug.log
> [INFO]
> 
>


Re: build Flink master brach fail due to npm

2019-08-20 Thread Biao Liu
HI,

Could you try adding "-Dskip.npm" when building the project?

This is just a work-around way to skip this installation. Though I don't
have much experience of npm. Maybe someone else could help.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 20 Aug 2019 at 16:23, 刘波 <1091643...@qq.com> wrote:

> Hi, users!
>
> When I'm build the Flink master branch  from github,  it's failed due to
> the flink-runtime-web module.
>   nodejs v10.15.2
>   npm6.10.3
>   Javaopenjdk version "11.0.4" 2019-07-16
>   maven 3.6.0
>
> anyone got the same problem? any suggestions to  resolve it... thanks~
>
>
> [INFO]
> [INFO] --- maven-remote-resources-plugin:1.5:process
> (process-resource-bundles) @ flink-runtime-web_2.11 ---
> [INFO]
> [INFO] --- frontend-maven-plugin:1.6:install-node-and-npm (install node
> and npm) @ flink-runtime-web_2.11 ---
> [INFO] Node v10.9.0 is already installed.
> [INFO]
> [INFO] --- frontend-maven-plugin:1.6:npm (npm install) @
> flink-runtime-web_2.11 ---
> [INFO] Running 'npm ci --cache-max=0 --no-save' in
> /home/ben/work/flink/flink-runtime-web/web-dashboard
> [WARNING] npm WARN prepare removing existing node_modules/ before
> installation
> [INFO]
> [INFO] > fsevents@1.2.7 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/fsevents
> [INFO] > node install
> [INFO]
> [ERROR] Aborted (core dumped)
> [INFO]
> [INFO] > node-sass@4.11.0 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/node-sass
> [INFO] > node scripts/install.js
> [INFO]
> [INFO] Cached binary found at
> /home/ben/.npm/node-sass/4.11.0/linux-x64-64_binding.node
> [ERROR] Aborted (core dumped)
> [INFO]
> [INFO] > husky@1.3.1 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/husky
> [INFO] > node husky install
> [INFO]
> [INFO] husky > setting up git hooks
> [INFO] HUSKY_SKIP_INSTALL environment variable is set to 'true', skipping
> Git hooks installation.
> [ERROR] Aborted (core dumped)
> [ERROR] npm ERR! code ELIFECYCLE
> [ERROR] npm ERR! errno 134
> [ERROR] npm ERR! husky@1.3.1 install: `node husky install`
> [ERROR] npm ERR! Exit status 134
> [ERROR] npm ERR!
> [ERROR] npm ERR! Failed at the husky@1.3.1 install script.
> [ERROR] npm ERR! This is probably not a problem with npm. There is likely
> additional logging output above.
> [ERROR]
> [ERROR] npm ERR! A complete log of this run can be found in:
> [ERROR] npm ERR!
>  /home/ben/.npm/_logs/2019-08-20T07_59_08_468Z-debug.log
> [INFO]
> 
>


Re: Recovery from job manager crash using check points

2019-08-19 Thread Biao Liu
Hi Min,

> Do I need to set up zookeepers to keep the states when a job manager
crashes?

I guess you need to set up the HA [1] properly. Besides that, I would
suggest you should also check the state backend.

1.
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
2.
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/state_backends.html

Thanks,
Biao /'bɪ.aʊ/



On Mon, 19 Aug 2019 at 23:28,  wrote:

> Hi,
>
>
>
> I can use check points to recover Flink states when a task manger crashes.
>
>
>
> I can not use check points to recover Flink states when a job manger
> crashes.
>
>
>
> Do I need to set up zookeepers to keep the states when a job manager
> crashes?
>
>
>
> Regards
>
>
>
> Min
>
>
>


Re: Flink job parallelism

2019-08-16 Thread Biao Liu
Hi Vishwas,

Regardless of the available task manager, what's your job really look like?
Is it with parallelism 2 or 1?

It's hard to say what happened based on your description. Could you
reproduce the scenario?
If the answer is yes, then could you provide more details? Like a
screenshot, logs, codes of your example.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 15 Aug 2019 at 23:54, Vishwas Siravara  wrote:

> Hi guys,
> I have a flink job which I want to run with a parallelism of 2.
>
> I run it from command line like : flink run -p 2 -C
> file:///home/was/classpathconfig/ -c com.visa.flink.cli.Main
> flink-job-assembly-0.1-SNAPSHOT.jar flink druid
>
> My cluster has two task managers with only 1 task slot each.
> However when I look at the Web UI for my job , I see that one of the task
> managers is still available. But when I submit with the web UI , both the
> task managers are used for this job and I get a parallelism of 2.
>
> Can you help me with understanding as to why this happens ?
>
> Thank you
> Vishwas
>


Re: Flink metrics scope for YARN single job

2019-08-14 Thread Biao Liu
Hi Vasily,

> Is there any way to distinguish logs from different JobManager running on
same host?

Do you mean "distinguish metrics from different JobManager running on same
host"?
I guess there is no other variable you could use for now.

But I think it's reasonable to support this requirement. I would like to
discuss with the devs to hear their opinions. Will give you a feedback if
there is a conclusion.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 14 Aug 2019 at 19:46, Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi,
> I want to run Flink apps on YARN in single job mode and keep metrics in
> Graphite. But as i see, the only variable i can use for JobManager scope
> customization is :
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables
>
> Is there any way to distinguish logs from different JobManager running on
> same host?
>
>
> Thanks in advance.
>


Re: External classpath

2019-08-14 Thread Biao Liu
Hi Vishwas,

> Does my external class path have to be on NFS share ? Can I not have the
config directory on each machine in the same location ?

It can be local files on each machine.

> -C file://home/was/classpathconfig/

I guess the problem is your URI format of local file is not correct. Try to
use "-C file:///home/was/classpathconfig/" instead.
BTW, if it's a directory, it must end with '/' [1].

> Also I tried to put my files in s3 and tried to run flink run -C s3://
flink.dev/config -c com.visa.flink.cli.Main
flink-job-assembly-0.1-SNAPSHOT.jar flink druid

Currently it doesn't support s3 or other distributed file system as the
parameter of -C.

1. https://docs.oracle.com/javase/8/docs/api/java/net/URLClassLoader.html

Thanks,
Biao /'bɪ.aʊ/



On Thu, 15 Aug 2019 at 07:51, Vishwas Siravara  wrote:

> Hi guys,
> I m very close to deploying my application in production so I am trying to
> externalize some of the config files which has to be available on the
> classpath when I run my application via flink command line interface.
>
> From the flink doc I can add to class path by
>
> -C,--classpath  Adds a URL to each user code
>   classloader  on all nodes in the
>   cluster. The paths must specify a
>   protocol (e.g. file://) and be
>   accessible on all nodes (e.g. by 
> means
>   of a NFS share). You can use this
>   option multiple times for specifying
>   more than one URL. The protocol must
>   be supported by the {@link
>   java.net.URLClassLoader}.
>
> So my job has 8 task managers( 8 different nodes) with 8 slots in each .
> Does my external class path have to be on NFS share ? Can I not have the
> config directory on each machine in the same location ? For instance on
> Node 1 the config files are in  the directory is
> /home/was/classpathconfig/  and the same on every node. Does it have to
> be on an NFS ?
> My command looks like this flink run -C file://home/was/classpathconfig/
> -c com.visa.flink.cli.Main flink-job-assembly-0.1-SNAPSHOT.jar flink druid
>
> Also I tried to put my files in s3 and tried to run flink run -C s3://
> flink.dev/config -c com.visa.flink.cli.Main
> flink-job-assembly-0.1-SNAPSHOT.jar flink druid
>
> Bad syntax for classpath: s3://flink.dev/config
>
>
> s3 does support URLClassLoader but I get the error saying bad syntax.
>
>
> Please let me know your thoughts. Thanks a lot to this community , I was able 
> to write my code in a week.
>
>
> Thanks,
>
> Vishwas
>
>


Re: **RegistrationTimeoutException** after TaskExecutor successfully registered at resource manager

2019-08-09 Thread Biao Liu
Hi Victor,

There used to be several relevant issues reported [1] [2] [3]. I guess you
have encountered the same problem.
This issue has been fixed in 1.8 [4]. Could you try it on a later version
(1.8+)?

1. https://issues.apache.org/jira/browse/FLINK-11137
2. https://issues.apache.org/jira/browse/FLINK-11215
3. https://issues.apache.org/jira/browse/FLINK-11708
4. https://issues.apache.org/jira/browse/FLINK-11718

Thanks,
Biao /'bɪ.aʊ/



On Fri, Aug 9, 2019 at 4:01 PM Victor Wong 
wrote:

> Hi,
>
> I’m using Flink version *1.7.1*, and I encountered this exception which
> was a little weird from my point of view;
>
> TaskManager successfully registered at resource manager, however after 5
> minutes (which is the default value of taskmanager.registration.timeout
> config) it threw out RegistrationTimeoutException;
>
>
>
> Here is the related logs of TM:
>
> 2019-08-09 01:30:24,061 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
> to ResourceManager akka.tcp://flink@xxx
> /user/resourcemanager().
>
> 2019-08-09 01:30:24,296 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved
> ResourceManager address, beginning registration
>
> 2019-08-09 01:30:24,296 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Registration at ResourceManager attempt 1 (timeout=100ms)
>
> 2019-08-09 01:30:24,379 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- *Successful
> registration at resource manager* akka.tcp://flink@xxx/user/resourcemanager
> under registration id 4535dea14648f6de68f32fb1a375806e.
>
> 2019-08-09 01:30:24,404 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive
> slot request AllocationID{372d1e10019c93c6c41d52b449cea5f2} for job
> e7b86795178efe43d7cac107c6cb8c33 from resource manager with leader id
> .
>
> …
>
> 2019-08-09 01:30:33,590 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Un-registering task and sending final execution state FINISHED to
> JobManager for task Source:  ; // *I don’t know if this is related,
> so I add it here in case;  This Flink Kafka source just finished because it
> consumed no Kafka partitions (Flink Kafka parallelism > Kafka topic
> partitions)*
>
> …
>
> 2019-08-09 01:35:24,753 ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error
> occurred in TaskExecutor akka.tcp://flink@xxx/user/taskmanager_0.
>
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> Could not register at the ResourceManager within the specified maximum
> registration duration 30 ms. This indicates a problem with this
> instance. Terminating now.
>
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1037)
>
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1023)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Thanks,
>
> Victor
>


Re: need help

2019-08-08 Thread Biao Liu
你好,

异常里可以看出 AskTimeoutException, 可以调整两个参数 akka.ask.timeout 和 web.timeout
再试一下,默认值如下

akka.ask.timeout: 10 s
web.timeout: 1

PS: 搜 “AskTimeoutException Flink” 可以搜到很多相关答案

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 7:33 PM 陈某  wrote:

>
>
> -- Forwarded message -
> 发件人: 陈某 
> Date: 2019年8月8日周四 下午7:25
> Subject: need help
> To: 
>
>
> 你好,我是一个刚接触flink的新手,在搭建完flink on
> yarn集群后,依次启动zookeeper,hadoop,yarn,flkink集群,并提交认识到yarn上时运行遇到问题,网上搜索相关问题,暂未找到解决方式,希望能得到帮助,谢谢。
>
> 使用的运行指令为:
> [root@flink01 logs]# flink run -m  yarn-cluster
> ./examples/streaming/WordCount.jar
> 查看log后错误信息如下:(附件中为完整的log文件)
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 91e82fd8626bde4c901bf0b1639e12e7)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error.,  akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#2035575525]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> 

Re: Capping RocksDb memory usage

2019-08-08 Thread Biao Liu
Hi Cam,

AFAIK, that's not an easy thing. Actually it's more like a Rocksdb issue.
There is a document explaining the memory usage of Rocksdb [1]. It might be
helpful.

You could define your own option to tune Rocksdb through
"state.backend.rocksdb.options-factory" [2]. However I would suggest not to
do this unless you are fully experienced of Rocksdb. IMO it's quite
complicated.

Meanwhile I can share a bit experience of this. We have tried to put the
cache and filter into block cache before. It's useful to control the memory
usage. But the performance might be affected at the same time. Anyway you
could try and tune it. Good luck!

1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
2.
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 11:44 AM Cam Mach  wrote:

> Yes, that is correct.
> Cam Mach
> Software Engineer
> E-mail: cammac...@gmail.com
> Tel: 206 972 2768
>
>
>
> On Wed, Aug 7, 2019 at 8:33 PM Biao Liu  wrote:
>
>> Hi Cam,
>>
>> Do you mean you want to limit the memory usage of RocksDB state backend?
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, Aug 8, 2019 at 2:23 AM miki haiat  wrote:
>>
>>> I think using metrics exporter is the easiest way
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb
>>>
>>>
>>> On Wed, Aug 7, 2019, 20:28 Cam Mach  wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> What is the most easy and efficiently way to cap RocksDb's memory usage?
>>>>
>>>> Thanks,
>>>> Cam
>>>>
>>>>


Re: Flink官网barrier疑问

2019-08-07 Thread Biao Liu
你好,范瑞

Barrier alignment 这里并不会涉及 output/input queue,pending 的只是用于 alignment
的一小部分数据。

如果想了解 checkpoint 的原理,建议阅读文档中引用的两篇论文。[1] [2]

如果想了解 Flink 的具体实现,这里的文档是 internal 部分的,可能需要阅读一下相关代码了。[3] [4]

1. https://arxiv.org/abs/1506.08603
2.
https://www.microsoft.com/en-us/research/publication/distributed-snapshots-determining-global-states-distributed-system/?from=https%3A%2F%2Fresearch.microsoft.com%2Fen-us%2Fum%2Fpeople%2Flamport%2Fpubs%2Fchandy.pdf
3.
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
4.
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java

Thanks,
Biao /'bɪ.aʊ/



On Wed, Aug 7, 2019 at 2:11 PM ❄ <836961...@qq.com> wrote:

> Hi,老师:
> 老师,你好flink官网这个页面(
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/stream_checkpointing.htm)介绍barrier对齐的这里第三步
>  •  Once the last stream has received barrier n, the operator emits all
> pending outgoing records, and then emits snapshot n barriers itself.
>
> 这句话,说的是一旦接受到上游所有流的barrier n,这个Operator实例会发送所有 pending的输出记录,然后发送
> 把自己的barrier n发出去到下游。这里的pending的输出记录是指什么数据?是指barrier之前的那些还在Output
> Queue中的数据吗?不是barrier之后的数据吧,因为精准一次语义的话,snapshot之前,barrier之后的数据应该还没开始处理,等同步快照结束后才能开始处理。如果这里指的是barrier之前那些还在Output
> Queue中的数据,那么也不能马上把这些数据发出去吧,应该还要考虑下游的Input Queue中有足够空间
>
>
>
> 望解答,谢谢老师!
>
> 范瑞


Re: Does RocksDBStateBackend need a separate RocksDB service?

2019-08-07 Thread Biao Liu
Hi wanglei,

> Is there an embeded  RocksDB  service in the flink task?

Yes, and "RocksDB is an embeddable persistent key-value store for fast
storage". [1]

1. http://rocksdb.org/

Thanks,
Biao /'bɪ.aʊ/



On Wed, Aug 7, 2019 at 7:27 PM miki haiat  wrote:

> There  is no need to add an external RocksDB instance
>   .
> *The RocksDBStateBackend holds in-flight data in a RocksDB
>  database that is (per default) stored in the
> TaskManager data directories.  *
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>
>
>
> On Wed, Aug 7, 2019 at 1:25 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> In  my  code,  I just setStateBackend with a hdfs direcoty.
>>env.setStateBackend(new
>> RocksDBStateBackend("hdfs://user/test/job"));
>>
>> Is there an embeded  RocksDB  service in the flink task?
>>
>> --
>> wangl...@geekplus.com.cn
>>
>


Re: Capping RocksDb memory usage

2019-08-07 Thread Biao Liu
Hi Cam,

Do you mean you want to limit the memory usage of RocksDB state backend?

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 2:23 AM miki haiat  wrote:

> I think using metrics exporter is the easiest way
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb
>
>
> On Wed, Aug 7, 2019, 20:28 Cam Mach  wrote:
>
>> Hello everyone,
>>
>> What is the most easy and efficiently way to cap RocksDb's memory usage?
>>
>> Thanks,
>> Cam
>>
>>


Re: Can Flink help us solve the following use case

2019-08-07 Thread Biao Liu
Hi Yoandy,

Could you explain more of your requirements?
Why do you want to split data into "time slices"? Do you want to do some
aggregations or just give each record a tag or tags?

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 4:52 AM Sameer Wadkar  wrote:

> You could do this using custom triggers and evictors in Flink. That way
> you can control when the windows fire and what elements are fired with it.
> And lastly the custom evictors know when to remove elements from the
> window.
>
> Yes Flink can support it.
>
> Sent from my iPhone
>
> > On Aug 7, 2019, at 4:19 PM, Yoandy Rodríguez 
> wrote:
> >
> > Hello everybody,
> >
> > We have the following situation:
> >
> > 1) A data stream which collects all system events (near 1/2 a mil per
> day).
> >
> > 2) A database storing some aggregation of the data.
> >
> > We want to split the data into different "time slices" and be able to
> > "tag it" accordingly.
> >
> > Example:
> >
> > the events in the first hour will be tagged as such:
> >
> > Time of arrival (slice)Tag
> >
> > 0:00:00 - 0:59:59   Last Hour
> >
> > 0:30:00 - 0:59:59  Last 1/2 Hour
> >
> > 0:50:00 - 0:59:59  Last 10 minutes
> >
> > Now, when we reach 1:09:59 the "last ten minutes" tags, moves to  that
> > slice, and so do the other ones.
> >
> > Mi initial idea was to have multiple windows operating over the same
> > stream, but in that case I would have
> >
> > to keep a longer window just to remove the tag for events after the 1
> > hour period.  Is there any way to avoid this?
> >
> >
> > PD.
> >
> > This is part of my first Flink project so alternative
> > solutions/literature are very much welcome
> >
> >
>


Re: [EXTERNAL] Re: Delayed processing and Rate limiting

2019-08-07 Thread Biao Liu
Hi Shakir,

I'm not sure I have fully understand your requirements. I'll try to answer
your questions.

>From my understanding, there is no built-in feature of Flink to support
"rate limit" directly. I guess you need to implement one yourself.
Both of MapFunction or AsyncFunction could satisfy your requirement well
IMO. The difference between them is whether you need an asynchronous
processing or not. [1]

Currently back pressure of Flink could not base on user-defined conditions.
It's an internal flow-control strategy of Flink. [2] I don't think that's
the thing you want.

1.
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/asyncio.html
2.
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/back_pressure.html

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 4:13 AM PoolakkalMukkath, Shakir <
shakir_poolakkalmukk...@comcast.com> wrote:

> Hi Victor,
>
>
>
> Thanks for the reply and it helps. For the delayed processing, this is
> exactly what I was looking for.
>
>
>
> But for the Rate Limit, the one you suggested can only control the number
> of parallel requests. What I am looking is like limit the number of request
> per second or minute etc. Like 100 requests per minute, anything more that
> should trigger backpressure.
>
> A reference to how we can create the backpressure based on condition will
> also help me if I need to build my own rate limiter.
>
>
>
> Thanks in advance
>
> Shakir
>
>
>
> *From: *Victor Wong 
> *Date: *Wednesday, August 7, 2019 at 10:59 AM
> *To: *"PoolakkalMukkath, Shakir" ,
> user 
> *Subject: *[EXTERNAL] Re: Delayed processing and Rate limiting
>
>
>
> Hi Shakir,
>
>
>
> > Delayed Processing
>
> Maybe you can make use of the function
> ‘org.apache.flink.streaming.api.TimerService#registerProcessingTimeTimer’,
> check this doc for more details:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>
>
>
> > Rate Limit External Data Access
>
> With AsyncFunction, you can set a ‘capacity’ which *defines how many
> asynchronous requests may be in progress at the same time, * I’m not sure
> if this is what you need or not.
>
> Check this doc for more details:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api
>
>
>
> Best,
>
> Victor
>
>
>
> *From: *"PoolakkalMukkath, Shakir" 
> *Date: *Wednesday, August 7, 2019 at 10:06 PM
> *To: *user 
> *Subject: *Delayed processing and Rate limiting
>
>
>
> Hi Flink Team,
>
>
>
> I am looking for some direction/recommendation for below tasks
>
>
>
>1. *Delayed Processing: *
>
> Having a use case where we need to process events after a time-delay from
> event time. Let’s say, the event happened at time t1, and it reached the
> Flink immediately, but I have to wait t1+2min to process this.
>
> We are sourcing the events from Kafka, we like this applied after
> SourceFn. May be we can do this by ThreadSleep() on a MapFn, but looking if
> there is a better way to achieve this.
>
>
>
>1. *Rate Limit External Data Access*
>
> The best practices to implement rate limiting to external service, it can
> be either on mapFn or AsynchFn. What is the recommended approach to rate
> limit and  build backpressure.
>
>
>
> Thanks in advance
>
>
>
> Thanks,
>
> Shakir
>


Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Biao Liu
Congrats Hequn!

Thanks,
Biao /'bɪ.aʊ/



On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu  wrote:

> Congratulations to Hequn!
>
> Thanks,
> Zhu Zhu
>
> Zili Chen  于2019年8月7日周三 下午5:16写道:
>
>> Congrats Hequn!
>>
>> Best,
>> tison.
>>
>>
>> Jeff Zhang  于2019年8月7日周三 下午5:14写道:
>>
>>> Congrats Hequn!
>>>
>>> Paul Lam  于2019年8月7日周三 下午5:08写道:
>>>
 Congrats Hequn! Well deserved!

 Best,
 Paul Lam

 在 2019年8月7日,16:28,jincheng sun  写道:

 Hi everyone,

 I'm very happy to announce that Hequn accepted the offer of the Flink
 PMC to become a committer of the Flink project.

 Hequn has been contributing to Flink for many years, mainly working on
 SQL/Table API features. He's also frequently helping out on the user
 mailing lists and helping check/vote the release.

 Congratulations Hequn!

 Best, Jincheng
 (on behalf of the Flink PMC)



>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>


Re: jobmanager 日志异常

2019-08-05 Thread Biao Liu
你好,

> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.

这是收到了 signal 15 了 [1],Wong 说得对,搜一下 yarn node manager 或者 yarn resource
manager 的 log

1. https://access.redhat.com/solutions/737033

Thanks,
Biao /'bɪ.aʊ/



On Tue, Aug 6, 2019 at 12:30 PM Wong Victor 
wrote:

> Hi,
>   可以查看一下jobmanager所在节点的yarn log,搜索一下对应的container为什么被kill;
>
> Regards
>
> On 2019/8/6, 11:40 AM, "戴嘉诚"  wrote:
>
> 大家好:
>
>
>
> 我的flink是部署在yarn上左session,今天早上jobmanager自动退出了,然后yarn把他重新拉起了,导致里面跑的job重新启动了,但是我查看日志,看到jobmanager的日志没有任何异常,同时jobmanager也没有长时间的full
> gc和频繁的gc,以下是jobmanager的日志:
> 就是在06:44分的是偶,日志上标记了收收到停止请求,然后jobmanager直接停止了...请问是由于什么原因导致的呢?
>
> 2019-08-06 06:43:58,891 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7843 for job e49624208fe771c4c9527799fd46f2a3 (5645215
> bytes in
> > 801 ms).
> > 2019-08-06 06:43:59,336 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045039321 for job
> a9a7464ead55474bea6f42ed8e5de60f.
> > 2019-08-06 06:44:00,971 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045040957 for job
> 79788b218e684cb31c1ca0fcc641e89f.
> > 2019-08-06 06:44:01,357 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job a9a7464ead55474bea6f42ed8e5de60f (25870658
> bytes in
> > 1806 ms).
> > 2019-08-06 06:44:02,887 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 79788b218e684cb31c1ca0fcc641e89f (29798945
> bytes in
> > 1849 ms).
> > 2019-08-06 06:44:05,101 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045045092 for job
> 03f3a0bd53c21f90f70ea01916dc9f78.
> > 2019-08-06 06:44:06,547 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7844 @ 1565045046522 for job
> 486a1949d75863f823013d87b509d228.
> > 2019-08-06 06:44:07,311 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7844 for job 486a1949d75863f823013d87b509d228 (62458942
> bytes in
> > 736 ms).
> > 2019-08-06 06:44:07,506 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 03f3a0bd53c21f90f70ea01916dc9f78 (105565032
> bytes
> > in 2366 ms).
> > 2019-08-06 06:44:08,087 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045048055 for job
> 32783d371464265ef536454055ae6182.
> > 2019-08-06 06:44:09,626 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Checkpoint
> > 7050 of job 4b542195824ff7b7cdf749543fd368cb expired before
> completing.
> > 2019-08-06 06:44:09,647 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7051 @ 1565045049626 for job
> 4b542195824ff7b7cdf749543fd368cb.
> > 2019-08-06 06:44:12,006 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 32783d371464265ef536454055ae6182 (299599482
> bytes
> > in 3912 ms).
> > 2019-08-06 06:44:12,972 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045052962 for job
> 16db5afe9a8cd7c6278030d5dec4c80c.
> > 2019-08-06 06:44:13,109 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045053080 for job
> 9c1394a2d2ff47c7852eff9f1f932535.
> > 2019-08-06 06:44:16,779 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 16db5afe9a8cd7c6278030d5dec4c80c (152643149
> bytes
> > in 3666 ms).
> > 2019-08-06 06:44:18,598 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7828 for job 8df2b47f2a4c1ba0f7019ee5989f6e71 (837558245
> bytes
> > in 23472 ms).
> > 2019-08-06 06:44:19,193 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 9c1394a2d2ff47c7852eff9f1f932535 (594628825
> bytes
> > in 6067 ms).
> > 2019-08-06 06:44:19,238 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 5855 for job 108ce7f6f5f3e76b12fad9dbdbc8feba (45917615
> bytes in
> > 61819 ms).
> > 2019-08-06 06:44:19,248 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 5856 @ 1565045059238 

Re: some confuse for data exchange of parallel operator

2019-08-05 Thread Biao Liu
Hi Kylin,

> Can this map record all data? Or this map only record data from one
parallelism of upstream operator?
Neither of your guess is correct. It depends on the partitioner between the
map operator and upstream operator. You could find more in this document
[1].

1.
https://ci.apache.org/projects/flink/flink-docs-master/concepts/programming-model.html

Thanks,
Biao /'bɪ.aʊ/



On Mon, Aug 5, 2019 at 4:59 PM tangkailin  wrote:

> Hello,
>
>I don’t know how parallel operator exchange data in flink. for
> example, I define a map in a operator with n (n > 1) parallelism for
> counting. Can this map record all data? Or this map only record data from
> one parallelism of upstream operator?
>
>
>
> Thanks,
>
> Kylin
>


Re: BucketingSink 内存使用分析

2019-08-02 Thread Biao Liu
把 heap dump 出来分析一下?

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 1, 2019 at 6:07 PM 九思 <1048095...@qq.com> wrote:

> 请教各位老师,flink on
> yarn,1个TaskManager,4个slot,TaskManager内存4G,JobManager内存1G。使用BucketingSink写入hdfs,3秒checkpoint一次。每秒大概数据100条,3秒也就是300左右。输入数据,大概427个字节,427字节*300=128100字节=125KB;输出数据,大概80字节,80字节*300=24000字节=23KB。但是Flink
> 的 TaskManager的 jvm 不知道为啥占用了这么多,高的时候有1G。
> JVM (Heap/Non-Heap)
> Type Committed   Used Maximum
> Heap 2.68 GB 863 MB   2.68 GB
> Non-Heap 84.3 MB 82.8 MB  -1 B
> Total2.76 GB 946 MB   2.68 GB
>
>
>
> 按理说,3s chepoint 一次,会写入一次hdfs。也就是说,只有3s的数据才会缓存在内存。还有什么数据也在内存嘛?


Re: StreamingFileSink part file count reset

2019-08-02 Thread Biao Liu
Hi Sidhartha,

I don't think you should worry about this.

Currently the `StreamingFileSink` uses a long to keep this counter. The
maximum of long is 9,223,372,036,854,775,807. The counter would be reset if
count of files reaches that value. I don't think it should happen. WRT the
max filename length, for example, Linux allows 255 characters for most file
systems [1]. It's far more larger than the length of maximum length of long.

1.
https://unix.stackexchange.com/questions/32795/what-is-the-maximum-allowed-filename-and-folder-size-with-ecryptfs

Thanks,
Biao /'bɪ.aʊ/



On Fri, Aug 2, 2019 at 12:24 AM sidhartha saurav 
wrote:

> Thank you for the clarification Habibo and Andrey.
>
> Is there any limitation after which the global counter will reset ? I mean
> do we have to worry the counter may get too long and part file crosses the
> max filename length limit set by OS or is it handled by flink.
>
> Thanks
> Sidhartha
>
> On Tue, Jul 30, 2019, 10:10 AM Andrey Zagrebin 
> wrote:
>
>> Hi Sidhartha,
>>
>> This is a general limitation now because Flink does not keep counters for
>> all buckets but only a global one.
>> Flink assumes that the sink can write to any bucket any time and the
>> counter is not reset to not rewrite the previously written file number 0.
>>
>> Best,
>> Andrey
>>
>> On Tue, Jul 30, 2019 at 7:01 AM Haibo Sun  wrote:
>>
>>> Hi Sidhartha,
>>>
>>> Currently, the part counter is never reset to 0, nor is it allowed to
>>> customize the part filename. So I don't think there's any way to reset it
>>> right now.  I guess the reason why it can't be reset to 0 is that it is
>>> concerned that the previous parts will be overwritten. Although the bucket
>>> id is part of the part file path, StreamingFileSink does not know when the
>>> bucket id will change in the case of custom BucketAssginer.
>>>
>>> Best,
>>> Haibo
>>>
>>> At 2019-07-30 06:13:54, "sidhartha saurav"  wrote:
>>>
>>> Hi,
>>>
>>> We are using StreamingFileSink with a custom BucketAssigner and
>>> DefaultRollingPolicy. The custom BucketAssigner is simply a date bucket
>>> assigner. The StreamingFileSink creates part files with name
>>> "part--". The
>>> count is an integer and is incrementing on each rollover. Now my doubts
>>> are:
>>>
>>> 1. When does this count reset to 0 ?
>>> 2. Is there a way i can reset this count programmatically ? Since we are
>>> using day bucket we would like the count to reset every day.
>>>
>>> We are using Flink 1.8
>>>
>>> Thanks
>>> Sidhartha
>>>
>>>


Re: Custom log appender for YARN

2019-08-01 Thread Biao Liu
Hi Gyula,

How about putting the appender class in lib folder, but choosing the
appender only for user code through log4j.properties? That way only the
user logger would be sent to Kafka. It seems what you want...

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 1, 2019 at 2:25 PM Gyula Fóra  wrote:

> Hi Biao,
>
> Thanks for the input!
> This is basically where we got ourselves as well.
>
> We are trying to avoid putting things in the lib folder so separating the
> loggers would be great but we don't have a solution for it yet.
>
> Thanks
> Gyula
>
> On Thu, 1 Aug 2019, 07:09 Biao Liu 
>> Hi Gyula,
>>
>> I guess it should work if you put log appender jar under $flink/lib
>> folder.
>>
>> There are two different kinds of classloader in your case. One is for
>> Flink framework, the other is for user code. The framework classloader is
>> the parent of user classloader. The parent classloader could not find class
>> from sub-classloader due to the delegation model [1]. In your case, the `
>> ClusterEntrypoint` is in parent classloader, the log appender class in
>> fatjar is in sub-classloader.
>>
>> So I think there are two ways to solve this.
>> 1. Put your appender class under $flink/lib folder.
>> 2. Try to avoid using user-defined appender for Flink framework class.
>> (I'm not sure it could be done or not)
>>
>> You could find more informations in document [2] about Flink classloading
>> strategy.
>>
>> 1. https://docs.oracle.com/javase/8/docs/api/java/lang/ClassLoader.html
>> 2.
>> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_classloading.html
>>
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Wed, Jul 31, 2019 at 9:21 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> We are trying to configure a custom Kafka log appender for our YARN
>>> application and we hit the following problem.
>>>
>>> We included the log appender dependency in the fatjar of the application
>>> because in YARN that should be part of the system class path.
>>>
>>> However when the YARN cluster entrypoints run (YarnJobClusterEntrypoint,
>>> YarnTaskExecutorRunner) these only seem to have the contents of lib on
>>> their classpath. Does someone have any pointers to how the classpath is
>>> configured for running the entrypoints?
>>>
>>> The exception is this:
>>>
>>> java.lang.ClassNotFoundException: 
>>> org.apache.kafka.log4jappender.KafkaLog4jAppender
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:264)
>>> at org.apache.log4j.helpers.Loader.loadClass(Loader.java:198)
>>> at 
>>> org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:327)
>>> at 
>>> org.apache.log4j.helpers.OptionConverter.instantiateByKey(OptionConverter.java:124)
>>> at 
>>> org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:785)
>>> at 
>>> org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
>>> at 
>>> org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
>>> at 
>>> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
>>> at 
>>> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
>>> at 
>>> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
>>> at org.apache.log4j.LogManager.(LogManager.java:127)
>>> at 
>>> org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:81)
>>> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329)
>>> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
>>> at 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:104)
>>>
>>>
>>> Thanks,
>>> Gyula
>>>
>>>
>>>


Re: env.readFile只读取新文件

2019-08-01 Thread Biao Liu
恐怕没有现成的,自己写一个,继承 SourceFunction

Thanks,
Biao /'bɪ.aʊ/



On Wed, Jul 31, 2019 at 4:49 PM 王佩  wrote:

> 如下代码:
>
> DataStreamSource source = env.readFile(
> textInputFormat,
> "/data/appData/streamingWatchFile/source",
> FileProcessingMode.PROCESS_CONTINUOUSLY,
> 10 * 1000
>  );
>
> 当被监控目录下的某个文件被修改,如touch了一下,整个文件会重复处理一遍。
>
> 有没有什么方法,可以做到只读取新文件。想实现只读取新的Parquet文件的效果。
>


Re: Custom log appender for YARN

2019-07-31 Thread Biao Liu
Hi Gyula,

I guess it should work if you put log appender jar under $flink/lib folder.

There are two different kinds of classloader in your case. One is for Flink
framework, the other is for user code. The framework classloader is the
parent of user classloader. The parent classloader could not find class
from sub-classloader due to the delegation model [1]. In your case, the `
ClusterEntrypoint` is in parent classloader, the log appender class in
fatjar is in sub-classloader.

So I think there are two ways to solve this.
1. Put your appender class under $flink/lib folder.
2. Try to avoid using user-defined appender for Flink framework class. (I'm
not sure it could be done or not)

You could find more informations in document [2] about Flink classloading
strategy.

1. https://docs.oracle.com/javase/8/docs/api/java/lang/ClassLoader.html
2.
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_classloading.html


Thanks,
Biao /'bɪ.aʊ/



On Wed, Jul 31, 2019 at 9:21 PM Gyula Fóra  wrote:

> Hi All!
>
> We are trying to configure a custom Kafka log appender for our YARN
> application and we hit the following problem.
>
> We included the log appender dependency in the fatjar of the application
> because in YARN that should be part of the system class path.
>
> However when the YARN cluster entrypoints run (YarnJobClusterEntrypoint,
> YarnTaskExecutorRunner) these only seem to have the contents of lib on
> their classpath. Does someone have any pointers to how the classpath is
> configured for running the entrypoints?
>
> The exception is this:
>
> java.lang.ClassNotFoundException: 
> org.apache.kafka.log4jappender.KafkaLog4jAppender
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:264)
>   at org.apache.log4j.helpers.Loader.loadClass(Loader.java:198)
>   at 
> org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:327)
>   at 
> org.apache.log4j.helpers.OptionConverter.instantiateByKey(OptionConverter.java:124)
>   at 
> org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:785)
>   at 
> org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
>   at 
> org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
>   at 
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
>   at 
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
>   at 
> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
>   at org.apache.log4j.LogManager.(LogManager.java:127)
>   at 
> org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:81)
>   at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329)
>   at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:104)
>
>
> Thanks,
> Gyula
>
>
>


Re: Error while running flink job on local environment

2019-07-31 Thread Biao Liu
Hi Vinayak,

If `StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(2)` works for your case,
you could try as below.

`StreamExecutionEnvironment.setDefaultLocalParallelism(2);`
`StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();`

or

`StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();`
`env.setParallelism(2);`

That should be equivalent from the perspective of codes.


On Wed, Jul 31, 2019 at 2:51 PM Vinayak Magadum 
wrote:

> Hi Andrey and Jeff,
>
> Thank you for the reply.
> I agree with Jeff. My concern is to use different code for local and
> non-local deployments.
> It would help if StreamExecutionEnvironment.getExecutionEnvironment()
> works for both local and cluster deployments.
> 
> Thanks & Regards,
> Vinayak
>
>
> On Wed, Jul 31, 2019 at 7:02 AM Jeff Zhang  wrote:
>
>> @Andrey,
>>
>> Although your approach will work, it requires the user to write different
>> code for local mode and other modes. This is inconvenient for users.
>> IMHO, we should not check these kinds of memory configuration in local
>> mode. Or implicitly set the memory of TM pretty large in local mode to
>> avoid this kind of problem.
>>
>> Andrey Zagrebin  于2019年7月31日周三 上午1:32写道:
>>
>>> Hi Vinayak,
>>>
>>> the error message provides a hint about changing config options, you
>>> could try to use StreamExecutionEnvironment.createLocalEnvironment(2,
>>> customConfig); to increase resources.
>>> this issue might also address the problem, it will be part of 1.9
>>> release:
>>> https://issues.apache.org/jira/browse/FLINK-12852
>>>
>>> Best,
>>> Andrey
>>>
>>> On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum <
>>> magadumvina...@gmail.com> wrote:
>>>
 Hi,

 I am using Flink version: 1.7.1

 I have a flink job that gets the execution environment as below and
 executes the job.

 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();

 When I run the code in cluster, it runs fine. But on local machine
 while running the job via IntelliJ I get below error:

 org.apache.flink.runtime.client.JobExecutionException: Job execution
 failed.
 at
 org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at
 org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
 at
 org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
 
 Caused by: java.io.IOException: Insufficient number of network buffers:
 required 8, but only 3 available. The total number of network buffers is
 currently set to 12851 of 32768 bytes each. You can increase this number by
 setting the configuration keys 'taskmanager.network.memory.fraction',
 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
 at
 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
 at
 org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
 at
 org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
 at
 org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
 at java.lang.Thread.run(Thread.java:748)

 Workaround that I tried to make it run on local is to use
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.createLocalEnvironment(2);

 instead of StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();

 With Flink 1.4.2, StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both
 cluster as well as local environment.

 Is there any way to make
 StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster
 and local mode in flink 1.7.1? Specifically how to make it work locally via
 IntelliJ.
 
 Thanks & Regards,
 Vinayak

>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Fwd: Flink metrics with parallel operator

2019-07-31 Thread Biao Liu
Hi Sibendu,

Have you checked the document of metrics[1]? It might be helpful.

BTW, I think it's better to ask in user mailing list. So forward it there.

1.
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#system-scope


-- Forwarded message -
From: Sibendu Dey 
Date: Thu, Aug 1, 2019 at 2:42 AM
Subject: Flink metrics with parallel operator
To: 


Hello,

I have been working on a flink project and need some help with the metric
system.

I have a logic inside a process function which side outputs a particular
message on certain failure parameters.

This process function has a parallelism > 1. How do I keep track of the
failed messages through a counter metrics

which is scoped to the operator across all parallel instances?

Regards,
Sibendu


Re: Graceful Task Manager Termination and Replacement

2019-07-29 Thread Biao Liu
Hi Yu,

That's a great proposal. Wish to see this feature soon!

On Mon, Jul 29, 2019 at 4:59 PM Yu Li  wrote:

> Belated but FWIW, besides the region failover and best-efforts failover
> efforts, I believe stop with checkpoint as proposed in FLINK-12619 and
> FLIP-45 could also help here, FYI.
>
> W.r.t k8s, there're also some offline discussion about supporting local
> recovery with persistent volume even when task assigned to other TMs during
> job failover.
>
> [1] https://issues.apache.org/jira/browse/FLINK-12619
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-45%3A+Reinforce+Job+Stop+Semantic
>
> Best Regards,
> Yu
>
>
> On Wed, 24 Jul 2019 at 17:00, Aaron Levin  wrote:
>
>> I was on vacation but wanted to thank Biao for summarizing the current
>> state! Thanks!
>>
>> On Mon, Jul 15, 2019 at 2:00 AM Biao Liu  wrote:
>>
>>> Hi Aaron,
>>>
>>> From my understanding, you want shutting down a Task Manager without
>>> restart the job which has tasks running on this Task Manager?
>>>
>>> Based on current implementation, if there is a Task Manager is down, the
>>> tasks on it would be treated as failed. The behavior of task failure is
>>> defined via `FailoverStrategy` which is `RestartAllStrategy` by default.
>>> That's the reason why the whole job restarts when a Task Manager has
>>> gone. As Paul said, you could try "region restart failover strategy" when
>>> 1.9 is released. It might be helpful however it depends on your job
>>> topology.
>>>
>>> The deeper reason of this issue is the consistency semantics of Flink,
>>> AT_LEAST_ONCE or EXACTLY_ONCE. Flink must respect these semantics. So there
>>> is no much choice of `FailoverStrategy`.
>>> It might be improved in the future. There are some discussions in the
>>> mailing list that providing some weaker consistency semantics to improve
>>> the `FailoverStrategy`. We are pushing forward this improvement. I hope it
>>> can be included in 1.10.
>>>
>>> Regarding your question, I guess the answer is no for now. A more
>>> frequent checkpoint or a savepoint manually triggered might be helpful by a
>>> quicker recovery.
>>>
>>>
>>> Paul Lam  于2019年7月12日周五 上午10:25写道:
>>>
>>>> Hi,
>>>>
>>>> Maybe region restart strategy can help. It restarts minimum required
>>>> tasks. Note that it’s recommended to use only after 1.9 release, see [1],
>>>> unless you’re running a stateless job.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-10712
>>>>
>>>> Best,
>>>> Paul Lam
>>>>
>>>> 在 2019年7月12日,03:38,Aaron Levin  写道:
>>>>
>>>> Hello,
>>>>
>>>> Is there a way to gracefully terminate a Task Manager beyond just
>>>> killing it (this seems to be what `./taskmanager.sh stop` does)?
>>>> Specifically I'm interested in a way to replace a Task Manager that has
>>>> currently-running tasks. It would be great if it was possible to terminate
>>>> a Task Manager without restarting the job, though I'm not sure if this is
>>>> possible.
>>>>
>>>> Context: at my work we regularly cycle our hosts for maintenance and
>>>> security. Each time we do this we stop the task manager running on the host
>>>> being cycled. This causes the entire job to restart, resulting in downtime
>>>> for the job. I'd love to decrease this downtime if at all possible.
>>>>
>>>> Thanks! Any insight is appreciated!
>>>>
>>>> Best,
>>>>
>>>> Aaron Levin
>>>>
>>>>
>>>>


Re: How to access state from another map functions

2019-07-28 Thread Biao Liu
Hi Yuta,

No, you can't get the same state from different functions. That's not the
semantics of state. And these two functions might run in different
threads/processes even on different machines.

On Mon, Jul 29, 2019 at 11:12 AM Yuta Morisawa 
wrote:

> Hi all
>
> I want to use the same state for multiple times.
>
> stream
> .keyBy(0)
> .map(new MyRichMapfunction)
> ...
> .keyBy(0)
> .map(new MyRichMapfunction2)
>
>
> In this snippet, I want access the same state in MyRichMapfunction and
> MyRichMapfunction2, but I failed in spite of keying the same key.
> Is there any way to access the state from other class, or is there a
> kind of global state.
>
> --
> Thank you.
> Yuta
>
>


Re: Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-26 Thread Biao Liu
Hi Prakhar,

Sorry I don't have much experience on k8s. Maybe some other guys could help.

On Fri, Jul 26, 2019 at 6:20 PM Prakhar Mathur  wrote:

> Hi,
>
> So we were deploying our flink clusters on YARN earlier but then we moved
> to kubernetes, but then our clusters were not this big. Have you guys seen
> issues with job manager rest server becoming irresponsive on kubernetes
> before?
>
> On Fri, Jul 26, 2019, 14:28 Biao Liu  wrote:
>
>> Hi Prakhar,
>>
>> Sorry I could not find any abnormal message from your GC log and stack
>> trace.
>> Have you ever tried deploying the cluster in other ways? Not on
>> Kubernetes. Like on YARN or standalone. Just for narrowing down the scope.
>>
>>
>> On Tue, Jul 23, 2019 at 12:34 PM Prakhar Mathur 
>> wrote:
>>
>>>
>>> On Mon, Jul 22, 2019, 16:08 Prakhar Mathur  wrote:
>>>
>>>> Hi,
>>>>
>>>> We enabled GC logging, here are the logs
>>>>
>>>> [GC (Allocation Failure) [PSYoungGen: 6482015K->70303K(6776832K)]
>>>> 6955827K->544194K(20823552K), 0.0591479 secs] [Times: user=0.09 sys=0.00,
>>>> real=0.06 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6587039K->38642K(6763008K)]
>>>> 7060930K->512614K(20809728K), 0.0740978 secs] [Times: user=0.08 sys=0.00,
>>>> real=0.07 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6502858K->192077K(6734848K)]
>>>> 6976829K->666144K(20781568K), 0.0841759 secs] [Times: user=0.17 sys=0.00,
>>>> real=0.09 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6647378K->50108K(6759424K)]
>>>> 7121446K->524248K(20806144K), 0.0622997 secs] [Times: user=0.08 sys=0.00,
>>>> real=0.07 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6501890K->60606K(6779904K)]
>>>> 6976029K->534961K(20826624K), 0.0637955 secs] [Times: user=0.09 sys=0.00,
>>>> real=0.06 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6586046K->40411K(6768640K)]
>>>> 7060401K->514839K(20815360K), 0.0729137 secs] [Times: user=0.08 sys=0.00,
>>>> real=0.07 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6543919K->51886K(6797824K)]
>>>> 7018346K->526385K(20844544K), 0.0649143 secs] [Times: user=0.09 sys=0.00,
>>>> real=0.07 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6601690K->191832K(6754304K)]
>>>> 7076190K->666427K(20801024K), 0.1029686 secs] [Times: user=0.18 sys=0.00,
>>>> real=0.10 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6742947K->62693K(6781952K)]
>>>> 7217543K->537361K(20828672K), 0.0639272 secs] [Times: user=0.09 sys=0.00,
>>>> real=0.06 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6589986K->66299K(6770688K)]
>>>> 7064653K->541039K(20817408K), 0.0701853 secs] [Times: user=0.08 sys=0.00,
>>>> real=0.07 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6590742K->42995K(6800896K)]
>>>> 7065481K->517798K(20847616K), 0.0595729 secs] [Times: user=0.08 sys=0.00,
>>>> real=0.06 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6608678K->66127K(6793728K)]
>>>> 7083482K->541011K(20840448K), 0.0608270 secs] [Times: user=0.09 sys=0.00,
>>>> real=0.06 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6608886K->62063K(6822400K)]
>>>> 7083769K->537027K(20869120K), 0.0675917 secs] [Times: user=0.10 sys=0.00,
>>>> real=0.07 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6617146K->200674K(6812160K)]
>>>> 7092110K->812325K(20858880K), 1.1685237 secs] [Times: user=3.53 sys=0.71,
>>>> real=1.17 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6773610K->194848K(6633984K)]
>>>> 7385261K->806700K(20680704K), 0.0858601 secs] [Times: user=0.19 sys=0.00,
>>>> real=0.09 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6617888K->44002K(6723072K)]
>>>> 7229740K->655854K(20769792K), 0.0647387 secs] [Times: user=0.09 sys=0.00,
>>>> real=0.06 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6420043K->194672K(6702080K)]
>>>> 7031895K->806604K(20748800K), 0.0833792 secs] [Times: user=0.18 sys=0.00,
>>>> real=0.08 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6603376K->187059K(6596096K)]
>>>> 7215308K->799063K(20642816K), 0.0906529 secs] [Times: user=0.17 sys=0.00,
>>>> real=0.09 secs]
>>>> [GC (Allocation Failure) [PSYoungGen: 6572690K->51850K(6715904K)]
>>>&g

Re: Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-26 Thread Biao Liu
>
>> "gcs-async-channel-pool-19" - Thread t@116997
>>java.lang.Thread.State: TIMED_WAITING
>> at sun.misc.Unsafe.park(Native Method)
>> - parking to wait for <1be52a4a> (a
>> java.util.concurrent.SynchronousQueue$TransferStack)
>> at
>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>> at
>> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>> at
>> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>> at
>> java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
>> at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>Locked ownable synchronizers:
>> - None
>>
>> "RMI TCP Connection(36)-127.0.0.1" - Thread t@114070
>>java.lang.Thread.State: TIMED_WAITING
>> at java.lang.Object.wait(Native Method)
>> - waiting on <22b14b0b> (a
>> com.sun.jmx.remote.internal.ArrayNotificationBuffer)
>> at
>> com.sun.jmx.remote.internal.ArrayNotificationBuffer.fetchNotifications(ArrayNotificationBuffer.java:449)
>> at
>> com.sun.jmx.remote.internal.ArrayNotificationBuffer$ShareBuffer.fetchNotifications(ArrayNotificationBuffer.java:227)
>> at
>> com.sun.jmx.remote.internal.ServerNotifForwarder.fetchNotifs(ServerNotifForwarder.java:274)
>> at
>> javax.management.remote.rmi.RMIConnectionImpl$4.run(RMIConnectionImpl.java:1270)
>> at
>> javax.management.remote.rmi.RMIConnectionImpl$4.run(RMIConnectionImpl.java:1268)
>> at
>> javax.management.remote.rmi.RMIConnectionImpl.fetchNotifications(RMIConnectionImpl.java:1274)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
>> at sun.rmi.transport.Transport$1.run(Transport.java:200)
>>     at sun.rmi.transport.Transport$1.run(Transport.java:197)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
>> at
>> sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
>> at
>> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
>> at
>> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
>> at
>> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$592/690739656.run(Unknown
>> Source)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at
>> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> On Thu, Jul 18, 2019 at 12:25 PM Biao Liu  wrote:
>>
>>> Hi,
>>>
>>> It seems to be good based on your GC metrics. You could double check the
>>> GC log if you enable it. The GC log is more direct.
>>> I'm not sure what's happening in your JobManager. But I'm pretty sure
>>> that Flink could support far more larger scale cluster than yours.
>>>
>>> Have you ever checked the log file of JobManager? Is there any
>>> suspicious warning or error log?
>>> Have you ever tried some analytic tools to check the internal state of
>>> JobManager, like jstack.
>>>
>>> It's hard to do a deeper analysis based on current informations. It
>>> might be helpful if you could provide more details.
>>>
>>>
>>> Prakhar Mathur  于2019年7月18日周四 下午2:12写道:
>>>
>>>> Hi,
>>>>
>>>> We are using v1.6.2, currently, the number of TaskManagers are 70. We
>>>> have the GC metrics on a dashboard also. Sum of
>>>> Status.JVM.GarbageCollector.MarkSweepCompact.Time grouped by 1 min is
>>>> somewhere between 75 to 125
>>>> and Status.JVM.GarbageCollector.MarkSweepCompact.Count is fixed at 10.
>>>>
>>>> On Thu, Jul 18, 2019 at 11:32 AM Biao Liu  wrote:
>>>>
>>>>> Hi Prakhar,
>>>>>
>>>>> Have you ever checked the garbage collection of master?
>>>>> Which version of Flink are you using? How many TaskManagers in your
>>>>> cluster?
>>>>>
>>>>>
>>>>> Prakhar Mathur  于2019年7月18日周四 下午1:54写道:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We have deployed multiple Flink clusters on Kubernetess with 1
>>>>>> replica of Jobmanager and multiple of Taskmanager as per the requirement.
>>>>>> Recently we are observing that on increasing the number of Taskmanagers 
>>>>>> for
>>>>>> a cluster, the Jobmanager becomes irresponsive. It stops sending statsd
>>>>>> metric for some irregular interval. Even the Jobmanager pod keeps
>>>>>> restarting because it stops responding to the liveliness probe which
>>>>>> results in Kubernetes killing the pod. We tried increasing the resources
>>>>>> given(CPU, RAM) but it didn't help.
>>>>>>
>>>>>> Regards
>>>>>> Prakhar Mathur
>>>>>> Product Engineer
>>>>>> GO-JEK
>>>>>>
>>>>>


Re: Execution environments for testing: local vs collection vs mini cluster

2019-07-26 Thread Biao Liu
Hi Juan,

Sorry for the late reply.

1. the environments of data stream and data set are not same. An obvious
difference is there always be a "stream" prefix of environment for data
stream. For example, StreamExecutionEnvironment is for data stream,
ExecutionEnvironment and CollectionEnvironment are for data set.

You could use "StreamExecutionEnvironment.createLocalEnvironment" to run or
test a data stream job. Use ExecutionEnvironment.createLocalEnvironment or
CollectionEnvironment to run or test a data set job.

Actually you could also use
StreamExecutionEnvironment.getExecutionEnvironment
or ExecutionEnvironment.getExecutionEnvironment. Because they would choose
local environment automatically if you are running job standalone (in IDE
or execute the main method directly).

2. Regarding to MiniCluster, IMO it's a bit internal. The MiniCluster runs
as backend behind local environment. I think there is a subtle difference
of the position between mini cluster of Flink and mini cluster of Hadoop.

3. I will try to answer your questions below.

> Which test execution environment is recommended for each test use case?
It depends on which mode you are testing, data stream or data set.

> For example I don't see why would I use CollectionEnvironment when I have
the local environment available and running on several threads, what is a
good use case for CollectionEnvironment?
In the official document, it says "CollectionEnvironment is a low-overhead
approach for executing Flink programs". As I don't have much experience of
data set, I just check the relevant codes. The CollectionEnvironment seems
not to start a mini cluster. I believe it executes job in a lighter way.
BTW, There is no such an equivalent environment for data stream.

> Are all these 3 environments supported equality, or maybe some of them is
expected to be deprecated?
Obviously they are not same as mentioned above.
If a class is deprecated, it would be decorated by an annotation
"Deprecated".

> Are there any additional execution environments that could be useful for
testing on a single host?
I would suggest to follow the official documents [1][2] which you have
discovered, even there might be some other ways which seem to be
equivalent. Because if you depend on some internal implementation, it might
be changed over time without any notification.


1.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/testing.html#integration-testing
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/local_execution.html


On Tue, Jul 23, 2019 at 11:30 PM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi Bao,
>
> Thanks for your answer.
>
> 1. Integration tests for my project.
> 2. Both data stream and data sets
>
>
>
> On Mon, Jul 22, 2019 at 11:44 PM Biao Liu  wrote:
>
>> Hi Juan,
>>
>> I'm not sure what you really want. Before giving some suggestions, could
>> you answer the questions below first?
>>
>> 1. Do you want to write a unit test (or integration test) case for your
>> project or for Flink? Or just want to run your job locally?
>> 2. Which mode do you want to test? DataStream or DataSet?
>>
>>
>>
>> Juan Rodríguez Hortalá  于2019年7月23日周二
>> 下午1:12写道:
>>
>>> Hi,
>>>
>>> In
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html
>>> and
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html
>>> I see there are 3 ways to create an execution environment for testing:
>>>
>>>- StreamExecutionEnvironment.createLocalEnvironment and
>>>ExecutionEnvironment.createLocalEnvironment create an execution 
>>> environment
>>>running on a single JVM using different threads.
>>>- CollectionEnvironment runs on a single JVM on a single thread.
>>>- I haven't found not much documentation on the Mini Cluster, but it
>>>sounds similar to the Hadoop MiniCluster
>>>
>>> <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CLIMiniCluster.html>.
>>>If that is then case, then it would run on many local JVMs, each of them
>>>running multiple threads.
>>>
>>> Am I correct about the Mini Cluster? Is there any additional
>>> documentation about it? I discovered it looking at the source code of
>>> AbstractTestBase, that is mentioned on
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/testing.html#integration-testing.
>>> Also, it looks like launching the mini cluster registers it somewhere, so
>>> subsequent calls to `StreamExecutionEnviron

Re: Execution environments for testing: local vs collection vs mini cluster

2019-07-23 Thread Biao Liu
Hi Juan,

I'm not sure what you really want. Before giving some suggestions, could
you answer the questions below first?

1. Do you want to write a unit test (or integration test) case for your
project or for Flink? Or just want to run your job locally?
2. Which mode do you want to test? DataStream or DataSet?



Juan Rodríguez Hortalá  于2019年7月23日周二
下午1:12写道:

> Hi,
>
> In
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html
> and
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html
> I see there are 3 ways to create an execution environment for testing:
>
>- StreamExecutionEnvironment.createLocalEnvironment and
>ExecutionEnvironment.createLocalEnvironment create an execution environment
>running on a single JVM using different threads.
>- CollectionEnvironment runs on a single JVM on a single thread.
>- I haven't found not much documentation on the Mini Cluster, but it
>sounds similar to the Hadoop MiniCluster
>
> .
>If that is then case, then it would run on many local JVMs, each of them
>running multiple threads.
>
> Am I correct about the Mini Cluster? Is there any additional documentation
> about it? I discovered it looking at the source code of AbstractTestBase,
> that is mentioned on
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/testing.html#integration-testing.
> Also, it looks like launching the mini cluster registers it somewhere, so
> subsequent calls to `StreamExecutionEnvironment.getExecutionEnvironment`
> return an environment that uses the mini cluster. Is that performed by
> `executionEnvironment.setAsContext()` in
> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java#L56
> ? Is that execution environment registration process documented anywhere?
>
> Which test execution environment is recommended for each test use case?
> For example I don't see why would I use CollectionEnvironment when I have
> the local environment available and running on several threads, what is a
> good use case for CollectionEnvironment?
>
> Are all these 3 environments supported equality, or maybe some of them is
> expected to be deprecated?
>
> Are there any additional execution environments that could be useful for
> testing on a single host?
>
> Thanks,
>
> Juan
>
>
>


Re: Time extracting in flink

2019-07-22 Thread Biao Liu
Hi Andy,

As far as I know, Flink does not support feature like that.

I would suggest recording and calculating the time in user code.
For example, add a timestamp field (maybe an array) in your record with
printing a timestamp on in by each processing.


Andy Hoang  于2019年7月22日周一 下午4:49写道:

> Hi guys,
>
> I’m trying to write elk log for flink, this help us to store/calculate
> processing time of a group of operators for business auditing.
>
> I read about process_function and Debugging Windows & Event Time in docs.
> They’re focus on “keyed” events and monitoring using web/metric, where I
> want to, for example:
>
> val stream = env.add_source(kafka_source).map(doX).map(doY).filter(Z)
> Async.unordedWait(stream, calApiBlah).add_source(kafka_source)
>
> Track total time from `doY` till before add_source operator(). This kind
> of time can be processing time,
> or I want to track total time from source to sink of a stream event
> (maybe disgesting time?),
> So the target is to retrieve out total time of a set of chosen operators
>
> I think flink should have supported it somehow, but its just me haven’t
> found it in the docs yet.
>
> Thanks,
>
> Andy,
>
>
>


Re: Extending REST API with new endpoints

2019-07-22 Thread Biao Liu
Hi,

As far as I know, the RESTful handler is not pluggable. And I don't see a
strong reason from your description to do so.
Could you explain more about your requirement?


Oytun Tez  于2019年7月20日周六 上午4:36写道:

> Yep, I scanned all of the issues in Jira and the codebase, I couldn't find
> a way to plug my new endpoint in.
>
> I am basically trying to open up an endpoint for queryable state client. I
> also read somewhere that this may cause some issues due to SSL
> communication within the cluster.
>
> Any pointers?
>
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Fri, Jul 19, 2019 at 3:53 PM Oytun Tez  wrote:
>
>> Hi there,
>>
>> I am trying to add a new endpoint to the REST API, by
>> extending AbstractRestHandler. But this new handler needs to be added
>> in WebMonitorEndpoint, which has no interface for outside.
>>
>> Can I do this with 1.8? Any other way or plans to make this possible?
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>


Re: apache flink: Why checkpoint coordinator takes long time to get completion

2019-07-19 Thread Biao Liu
Hi Xiangyu,

Just took a glance at the relevant codes. There is a gap between
calculating the duration and logging it out. I guess the checkpoint 4 is
finished in 1 minute, but there is an unexpected time-consuming operation
during that time. But I can't tell which part it is.


Xiangyu Su  于2019年7月19日周五 下午4:14写道:

> Dear flink community,
>
> We are POC flink(1.8) to process data in real time, and using global
> checkpointing(S3) and local checkpointing(EBS), deploy cluster on EKS. Our
> application is consuming data from Kinesis.
>
> For my test e.g I am using checkpointing interval 5min. and minimum pause
> 2min.
>
> The issue what we saw is: It seems like flink checkpointing process would
> be idle for 3-4 min, before job manager get complete notification.
>
> here is some logging from job manager:
>
> 2019-07-10 11:59:03,893 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 4 @ 1562759941082 for job e7a97014f5799458f1c656135712813d.
> 2019-07-10 12:05:01,836 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 4 for job e7a97014f5799458f1c656135712813d (22387207650 bytes in 
> 58645 ms).
>
> As my understanding the logging above, the 
> completedCheckpoint(CheckpointCoordinator)
> object has been completed in 58645 ms, but the whole checkpointing process
> took ~ 6min.
>
> This logging is for 4th checkpointing, But the first 3 checkpointing were
> finished on time.
> Could you please tell me, why flink checkpointing in my test was starting
> "idle" for few minutes after 3 checkpointing?
>
> Best Regards
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
> Valentinskamp 70, Emporio, 19th Floor
> 20355 Hamburg
> M 0049(176)22943076
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>


Fwd: Issue running basic example locally

2019-07-19 Thread Biao Liu
Just forward it to user mailing list. it's not a development issue.

-- Forwarded message -
发件人: Caizhi Weng 
Date: 2019年7月19日周五 上午8:56
Subject: Re: Issue running basic example locally
To: 


Hi Andres,

`provided` of flink-streaming-java seems suspicious, can you
remove it and see what happens?

Andres Angel  于2019年7月19日周五 上午3:10写道:

> Hello everyone,
>
> I'm using IntelliJ in a ubuntu environment with java 1.8 to run my Flink
> framworks. My goal is consume few kinesis stream services and build a sort
> of pipeline. Initially I have build my pom.xml file attached and a test
> java class attached too.
>
> However , once I run my class from the IDE and getting this error:
>
> Error: A JNI error has occurred, please check your installation and try
> again
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/api/functions/source/SourceFunction
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
> at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
> at java.lang.Class.getMethod0(Class.java:3018)
> at java.lang.Class.getMethod(Class.java:1784)
> at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
> at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.api.functions.source.SourceFunction
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 7 more
>
> I think I dont need to install or consifure any flink service before run
> my job via IDE , please if someone could give me a hand with this issue I
> would appreciate it.
>
> thanks
>
>
>


Re: Re: Flink 的 log 文件夹下产生了 44G 日志

2019-07-19 Thread Biao Liu
最根本的解法当然是去掉打日志的地方,这 source 不是 Flink 内置的,Flink 当然不能控制你们自定义 source 的行为。

你可以考虑自己改一下 log4j.properties,手动关掉这个 logger, Flink 内置的 log4j.properties 里有
example,参考着改一下

log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
file
改成 log4j.logger.com.JavaCustoms.FlinkJMSStreamSource=OFF, file

但是这明显是个 ERROR,最好还是解决一下,要不就是掩耳盗铃啊


Henry  于2019年7月19日周五 下午2:20写道:

>
>
>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>
>
>
>
>
> 在 2019-07-19 11:11:37,"Caizhi Weng"  写道:
> >Hi Henry,
> >
> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >的源码让它出错后关闭或者进行其它处理...
> >
> >Henry  于2019年7月19日周五 上午9:31写道:
> >
> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> 报错图片链接:
> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >>
> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>


Re: Apache Flink - Side output time semantics for DataStream

2019-07-19 Thread Biao Liu
Hi,

I'm not sure what you exactly mean. Could you describe more about your
requirements?

M Singh  于2019年7月14日周日 上午9:33写道:

> Hi:
>
> I wanted to find out what is the timestamp associated with the elements of
> a stream side output with different stream time characteristics.
>
> Thanks
>
> Man
>


Re: Apache Flink - Event time and process time timers with same timestamp

2019-07-19 Thread Biao Liu
Hi,

Is it possible to support two different `TimeCharacteristic` in one job at
the same time?
I guess the answer is no. So I don't think there exists such a scenario.


M Singh  于2019年7月19日周五 上午12:19写道:

> Hey Folks - Just checking if you have any pointers for me.  Thanks for
> your advice.
>
> On Sunday, July 14, 2019, 03:12:25 PM EDT, M Singh 
> wrote:
>
>
> Also, are the event time timers and processing time timers handled
> separately - ie,  if I register event time timer and then use the same
> timestamp to delete processing time timer - will it remove the event time
> timer registration ?
>
> In the example
> https://github.com/streaming-with-flink/examples-scala/blob/master/src/main/scala/io/github/streamingwithflink/chapter6/CoProcessFunctionTimers.scala#L87
>  should
> the delete and register be on the processing time timer and is it ok to
> delete event time timer and register processing time timer ?
>
>   override def processElement2(
>   switch: (String, Long),
>   ctx: CoProcessFunction[SensorReading, (String, Long),
> SensorReading]#Context,
>   out: Collector[SensorReading]): Unit = {
>
> // enable reading forwarding
> forwardingEnabled.update(true)
> // set disable forward timer
> val timerTimestamp = ctx.timerService().currentProcessingTime() +
> switch._2
> val curTimerTimestamp = disableTimer.value()
> if (timerTimestamp > curTimerTimestamp) {
>   // remove current timer and register new timer
>  * ctx.timerService().deleteEventTimeTimer(curTimerTimestamp)*
> *  ctx.timerService().registerProcessingTimeTimer(timerTimestamp)*
>   disableTimer.update(timerTimestamp)
> }
>   }
>
>
>
>
>
> On Sunday, July 14, 2019, 01:52:44 PM EDT, M Singh 
> wrote:
>
>
> Hi:
>
> If I register both event time timer and processing time timer with the
> same timestamp for a particular key - will they both fire or only one will
> fire ? If only one, what will be its time domain ?
>
> Thanks
>


Re: Apache Flink - Manipulating the state of an object in an evictor or trigger

2019-07-19 Thread Biao Liu
Hi,

I don't find any official document about it.

There are several state relevant methods in `TriggerContext`. I believe
it's absolutely safe to use state in `Trigger` through `TriggerContext`.

Regarding to `Evictor`, there is no such methods in `EvictorContext`. After
taking a glance on relevant codes, I guess it's OK for now. But I suggest
do not manipulate state in `Evictor` until there is an official guarantee.

BTW, I'm just wondering why do you want to manipulate state in `Evictor`?
If your requirement is reasonable, maybe we could support it officially.


M Singh  于2019年7月14日周日 下午9:07写道:

> Hi:
>
> Is it safe to manipulate the state of an object in the evictor or trigger
> ?  Are there any best practices/dos and don't on this ?
>
> Thanks
>


Re: Writing Flink logs into specific file

2019-07-18 Thread Biao Liu
Hi Soheil,

> I was wondering if is it possible to save logs into a specified file?

Yes, of course.

> I put the following file in the resource directory of the project but it
has no effect

I guess because the log4j has a higher priority. In the document [1], it
says "Users willing to use logback instead of log4j can just exclude log4j
(or delete it from the lib/ folder)."

1.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html


Soheil Pourbafrani  于2019年7月19日周五 上午2:03写道:

> Hi,
>
> When we run the Flink application some logs will be generated about the
> running, in both local and distributed environment. I was wondering if is
> it possible to save logs into a specified file?
>
> I put the following file in the resource directory of the project but it
> has no effect:
> logback.xml
>
> 
> 
> flink_logs.txt
> false
> 
> %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
> %X{sourceThread} - %msg%n
> 
> 
>
> 
> 
> 
>
>


Re: Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-18 Thread Biao Liu
Hi,

It seems to be good based on your GC metrics. You could double check the GC
log if you enable it. The GC log is more direct.
I'm not sure what's happening in your JobManager. But I'm pretty sure that
Flink could support far more larger scale cluster than yours.

Have you ever checked the log file of JobManager? Is there any suspicious
warning or error log?
Have you ever tried some analytic tools to check the internal state of
JobManager, like jstack.

It's hard to do a deeper analysis based on current informations. It might
be helpful if you could provide more details.


Prakhar Mathur  于2019年7月18日周四 下午2:12写道:

> Hi,
>
> We are using v1.6.2, currently, the number of TaskManagers are 70. We have
> the GC metrics on a dashboard also. Sum of
> Status.JVM.GarbageCollector.MarkSweepCompact.Time grouped by 1 min is
> somewhere between 75 to 125
> and Status.JVM.GarbageCollector.MarkSweepCompact.Count is fixed at 10.
>
> On Thu, Jul 18, 2019 at 11:32 AM Biao Liu  wrote:
>
>> Hi Prakhar,
>>
>> Have you ever checked the garbage collection of master?
>> Which version of Flink are you using? How many TaskManagers in your
>> cluster?
>>
>>
>> Prakhar Mathur  于2019年7月18日周四 下午1:54写道:
>>
>>> Hello,
>>>
>>> We have deployed multiple Flink clusters on Kubernetess with 1 replica
>>> of Jobmanager and multiple of Taskmanager as per the requirement. Recently
>>> we are observing that on increasing the number of Taskmanagers for a
>>> cluster, the Jobmanager becomes irresponsive. It stops sending statsd
>>> metric for some irregular interval. Even the Jobmanager pod keeps
>>> restarting because it stops responding to the liveliness probe which
>>> results in Kubernetes killing the pod. We tried increasing the resources
>>> given(CPU, RAM) but it didn't help.
>>>
>>> Regards
>>> Prakhar Mathur
>>> Product Engineer
>>> GO-JEK
>>>
>>


Re: Job Manager becomes irresponsive if the size of the session cluster grows

2019-07-18 Thread Biao Liu
Hi Prakhar,

Have you ever checked the garbage collection of master?
Which version of Flink are you using? How many TaskManagers in your
cluster?


Prakhar Mathur  于2019年7月18日周四 下午1:54写道:

> Hello,
>
> We have deployed multiple Flink clusters on Kubernetess with 1 replica of
> Jobmanager and multiple of Taskmanager as per the requirement. Recently we
> are observing that on increasing the number of Taskmanagers for a cluster,
> the Jobmanager becomes irresponsive. It stops sending statsd metric for
> some irregular interval. Even the Jobmanager pod keeps restarting because
> it stops responding to the liveliness probe which results in Kubernetes
> killing the pod. We tried increasing the resources given(CPU, RAM) but it
> didn't help.
>
> Regards
> Prakhar Mathur
> Product Engineer
> GO-JEK
>


Re: Flink 的 log 文件夹下产生了 44G 日志

2019-07-17 Thread Biao Liu
Hi Henry,

邮件列表貌似不能支持直接贴图,所以无法理解“里面不停的在产生 error
信息,但是也没有看到具体哪里报错”是什么意思。尝试贴图到第三方,然后贴链接上来?不知道还有没有更好的办法

To zhisheng2018, 你为什么老回空邮件?


zhisheng2...@gmail.com  于2019年7月18日周四 上午12:49写道:

>


Re: Questions about user doc.

2019-07-17 Thread Biao Liu
Hi Vishwas,

> I am guessing this means that Flink executes successive tasks from
different pipelines successively right ?

As the document described, "Note that Flink often executes successive tasks
concurrently: For Streaming programs, that happens in any case, but also
for batch programs, it happens frequently.". So I think "successively" is
not accurate, at least for streaming job.

> I also don't fully understand Intermediate result partition and
Intermediate dataset , why are there two boxes in the diagram for
intermediate result after the first execution job vertex ? Is there any
more docs I can read to clearly understand these diagrams, thanks for your
help.

1. The "Intermediate dataset" is a kind of logical concept described in
JobGraph, while the "Intermediate result partition" is more like physical
concept described in ExecutionGraph. The "Intermediate result partition" is
a parallel version of "Intermediate dataset".
2. This document is under "Internals" part. It refers to some internal
implementations. There might not be enough documents as you wish. There are
some links of the critical concepts of this document. They link to Flink
Github repository. Sometimes codes are the best document :)


Vishwas Siravara  于2019年7月17日周三 下午1:40写道:

> Hey guys,
> In this document :
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
>  ,
> there is a line in the beginning of the scheduling section which says that
> : "A pipeline consists of multiple successive tasks, such as the *n-th* 
> parallel
> instance of a MapFunction together with the *n-th* parallel instance of a
> ReduceFunction. Note that Flink often executes successive tasks
> concurrently:"
>
> I am guessing this means that Flink executes successive tasks from
> different pipelines successively right ?
>
> I also don't fully understand Intermediate result partition and
> Intermediate dataset , why are there two boxes in the diagram for
> intermediate result after the first execution job vertex ? Is there any
> more docs I can read to clearly understand these diagrams, thanks for your
> help.
>
> Thanks,
> Vishwas
>


Re: Even key distribution workload

2019-07-15 Thread Biao Liu
Hi Navneeth,

The "keyby" semantics must keep the data under same key into same task. So
basically this data skew issue is caused by your data distribution.
As far as I known, Flink could not handle data skew very well. There is a
proposal about local aggregation which is still under discussion in dev
mailing list. It can alleviate the data skew. But I guess it still need
some time.

As Caizhi mentioned, it's better to do something in user codes as a
workaround solution. For example, redistribute the skew data.


Navneeth Krishnan  于2019年7月15日周一 下午2:38写道:

> Hi All,
>
> Currently I have a keyBy user and I see uneven load distribution since
> some of the users would have very high load versus some users having very
> few messages. Is there a recommended way to achieve even distribution of
> workload? Has someone else encountered this problem and what was the
> workaround?
>
> Thanks
>


Re: Graceful Task Manager Termination and Replacement

2019-07-15 Thread Biao Liu
Hi Aaron,

>From my understanding, you want shutting down a Task Manager without
restart the job which has tasks running on this Task Manager?

Based on current implementation, if there is a Task Manager is down, the
tasks on it would be treated as failed. The behavior of task failure is
defined via `FailoverStrategy` which is `RestartAllStrategy` by default.
That's the reason why the whole job restarts when a Task Manager has gone.
As Paul said, you could try "region restart failover strategy" when 1.9 is
released. It might be helpful however it depends on your job topology.

The deeper reason of this issue is the consistency semantics of Flink,
AT_LEAST_ONCE or EXACTLY_ONCE. Flink must respect these semantics. So there
is no much choice of `FailoverStrategy`.
It might be improved in the future. There are some discussions in the
mailing list that providing some weaker consistency semantics to improve
the `FailoverStrategy`. We are pushing forward this improvement. I hope it
can be included in 1.10.

Regarding your question, I guess the answer is no for now. A more frequent
checkpoint or a savepoint manually triggered might be helpful by a quicker
recovery.


Paul Lam  于2019年7月12日周五 上午10:25写道:

> Hi,
>
> Maybe region restart strategy can help. It restarts minimum required
> tasks. Note that it’s recommended to use only after 1.9 release, see [1],
> unless you’re running a stateless job.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10712
>
> Best,
> Paul Lam
>
> 在 2019年7月12日,03:38,Aaron Levin  写道:
>
> Hello,
>
> Is there a way to gracefully terminate a Task Manager beyond just killing
> it (this seems to be what `./taskmanager.sh stop` does)? Specifically I'm
> interested in a way to replace a Task Manager that has currently-running
> tasks. It would be great if it was possible to terminate a Task Manager
> without restarting the job, though I'm not sure if this is possible.
>
> Context: at my work we regularly cycle our hosts for maintenance and
> security. Each time we do this we stop the task manager running on the host
> being cycled. This causes the entire job to restart, resulting in downtime
> for the job. I'd love to decrease this downtime if at all possible.
>
> Thanks! Any insight is appreciated!
>
> Best,
>
> Aaron Levin
>
>
>


  1   2   >