blink(????flink1.5.1????)????????????hadoop????????

2020-01-21 Thread Yong

  flinkhadoop
??
  ??blink??flink 
standalonehadoop hdfs 
??kerberos??TM??jobHadoop
 YARN??flinkYARN??Hadoop 
YARN??flinkhadoopYARN
??
[qateadmin@UAT14475 bin]$ ./yarn-session.sh
kerberos
2020-01-22 14:47:36,993 INFO 
org.apache.hadoop.security.UserGroupInformation
   - Login successful for user 
htlapi...@dc.sh.ctripcorp.com using keytab file /opt/data/blink/htlapidev.keytab
2020-01-22 14:47:36,994 INFO 
org.apache.flink.runtime.security.modules.HadoopModule   
 - Hadoop user set to htlapi...@dc.sh.ctripcorp.com (auth:KERBEROS)
2020-01-22 14:47:37,296 INFO 
org.apache.flink.yarn.cli.FlinkYarnSessionCli 
   - No path for the flink jar passed. Using the 
location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-01-22 14:47:37,450 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing 
over to rm2
2020-01-22 14:47:37,491 INFO 
org.apache.hadoop.io.retry.RetryInvocationHandler
  - Exception while invoking getClusterNodes of class 
ApplicationClientProtocolPBClientImpl over rm2 after 1 fail over attempts. 
Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Server 
asks us to fall back to SIMPLE auth, but this client is configured to only 
allow secure connections.; Host Details : local host is: "UAT14475/10.5.119.0"; 
destination host is: "uat14476":8032;
at 
org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1475)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1408)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy16.getClusterNodes(Unknown 
Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterNodes(ApplicationClientProtocolPBClientImpl.java:262)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy17.getClusterNodes(Unknown 
Source)
at 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNodeReports(YarnClientImpl.java:488)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:318)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:539)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:448)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:659)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$7(FlinkYarnSessionCli.java:887)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:887)
Caused by: java.io.IOException: Server asks us to fall back to SIMPLE auth, but 
this client is configured to only allow secure connections.
at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:755)
at 
org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
at 
org.apache.hadoop.ipc.Client.getConnection(Client.java:1524)
at 
org.apache.hadoop.ipc.Client.call(Client.java:1447)
... 22 more
2020-01-22 14:47:37,495 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing 
over to rm1
2020-01-22 14:47:37,498 INFO 
org.apache.hadoop.io.retry.RetryInvocationHandler
  - Exception while invoking getClusterNodes of class 
ApplicationClientProtocolPBClientImpl over rm1 after 2 fail over attempts. 
Trying to fail over after sleeping for 36261ms.
java.net.ConnectException: Call From UAT14475/10.5.119.0 to 
uat14475.novalocal:8032 failed on connection exception: 
java.net.ConnectException: Connection refused; For more details see: 
http://wiki.apache.org/hadoop/ConnectionRefused
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-21 Thread kant kodali
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same
joining key right?

2) Can I use state processor API
 from
an external application to query the intermediate results in near
real-time? I thought querying rocksdb state is a widely requested feature.
It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state which are
> both `>>`.
> It is a 2-level map structure, where the `col1` is the join key, it is
> the first-level key of the state. The key of the MapState is the input row,
> and the `count` is the number of this row, the expiredTime indicates
> when to cleanup this row (avoid infinite state size). You can find the
> source code here[1].
> In blink planner, the state structure will be more complex which is
> determined by the meta-information of upstream. You can see the source code
> of blink planner here [2].
> 3) Currently, the intermediate state is not exposed to users. Usually,
> users should write the query result to an external system (like Mysql) and
> query the external system.
> Query on the intermediate state is on the roadmap, but I guess it is
> not in 1.11 plan.
>
> Best,
> Jark
>
> [1]:
> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
> [2]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>
>
> 2020年1月21日 18:01,kant kodali  写道:
>
> Hi All,
>
> If I run a query like this
>
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
> table1.col1 = table2.col1")
>
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml
> says state.backend = 'rocksdb'
>
> 2) If the intermediate results are stored in rockdb then what is the key
> and value in this case(given the query above)?
>
> 3) What is the best way to query these intermediate results from an
> external application? while the job is running and while the job is not
> running?
>
> Thanks!
>
>
>


Re: Influxdb reporter not honouring the metrics scope

2020-01-21 Thread Gaurav Singhania
Thanks for the response and the fix.

On Wed, 22 Jan 2020 at 01:43, Chesnay Schepler  wrote:

> The solution for 1.9 and below is to create a customized version of the
> influx db reporter which excludes certain tags.
>
> On 21/01/2020 19:27, Yun Tang wrote:
>
> Hi, Gaurav
>
> InfluxDB metrics reporter has a fixed format of reporting metrics which
> cannot be controlled by the scope.
>
> If you don't want some tags stored, you can try to use `
> metrics.reporter..scope.variables.excludes` which introduced in
> flink-1.10 [1], to exclude specific variables. However, there exists no
> good solution for Flink-1.9 currently.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#reporter
> Best
> Yun Tang
> --
> *From:* Gaurav Singhania  
> *Sent:* Monday, January 20, 2020 13:04
> *To:* user@flink.apache.org 
> 
> *Subject:* Influxdb reporter not honouring the metrics scope
>
> Hi,
> We are using influxdb reporter for flink 1.9 to capture our metrics. We
> want to override the scope of task metrics, however even after providing
> the config in yaml file the metrics continues to have the tags we don't
> want.
>
> The metric scope we want to change is :
> *metrics.scope.task *with a default configuration of "
> .taskmanager"
> We tried following configuration and none of them worked
> ".taskmanager..."
> ".taskmanager...constant_value."
>
> None of them worked and task_name continues to be part of the tags of the
> measurement sent by influxdb reporter.
>
> Thanks,
> Gaurav Singhania
>
>
>


Re: Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-21 Thread tison
那你看下 TM 那台机器上的 TM 日志,从 JM 端来看 TM 曾经成功起来过并注册了自己,你看看 TM 是怎么挂的或者别的什么情况

Best,
tison.


郑 洁锋  于2020年1月22日周三 上午11:54写道:

> TM没有起来,服务器本身内存cpu都是够的,还很空闲
>
> 
> zjfpla...@hotmail.com
>
> 发件人: tison
> 发送时间: 2020-01-22 11:25
> 收件人: user-zh
> 主题: Re: flink on yarn任务启动报错 The assigned slot
> container_e10_1579661300080_0005_01_02_0 was removed.
> 20/01/22 11:08:49 INFO yarn.YarnResourceManager: Closing TaskExecutor
> connection container_e10_1579661300080_0005_01_02 because: The
> heartbeat of TaskManager with id container_e10_1579661300080_0005_01_02
> timed out.
>
> 你请求资源的时候把 slot 请求发到这台机器上了,然后它心跳超时了,你看看 TM 有没有正常起来,有没有资源不够或者挂了
>
> Best,
> tison.
>
>
> 郑 洁锋  于2020年1月22日周三 上午11:16写道:
>
> > 大家好,
> >flink on yarn任务启动时,发现报错了The assigned slot
> > container_e10_1579661300080_0005_01_02_0 was removed.
> >环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241
> >
> > flink版本为1.8.1,yarn上的日志:
> >
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:
> >
> 
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting
> > YarnJobClusterEntrypoint (Version: , Rev:7297bac,
> Date:24.06.2019
> > @ 23:04:28 CST)
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user:
> > cloudera-scm
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current
> > Hadoop/Kerberos user: root
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java
> > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size:
> > 406 MiBytes
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME:
> > /usr/java/default
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version:
> 2.6.5
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments:
> > (none)
> > 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath:
> >
> 

Re: Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-21 Thread 郑 洁锋
TM没有起来,服务器本身内存cpu都是够的,还很空闲


zjfpla...@hotmail.com

发件人: tison
发送时间: 2020-01-22 11:25
收件人: user-zh
主题: Re: flink on yarn任务启动报错 The assigned slot 
container_e10_1579661300080_0005_01_02_0 was removed.
20/01/22 11:08:49 INFO yarn.YarnResourceManager: Closing TaskExecutor
connection container_e10_1579661300080_0005_01_02 because: The
heartbeat of TaskManager with id container_e10_1579661300080_0005_01_02
timed out.

你请求资源的时候把 slot 请求发到这台机器上了,然后它心跳超时了,你看看 TM 有没有正常起来,有没有资源不够或者挂了

Best,
tison.


郑 洁锋  于2020年1月22日周三 上午11:16写道:

> 大家好,
>flink on yarn任务启动时,发现报错了The assigned slot
> container_e10_1579661300080_0005_01_02_0 was removed.
>环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241
>
> flink版本为1.8.1,yarn上的日志:
>
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:
> 
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting
> YarnJobClusterEntrypoint (Version: , Rev:7297bac, Date:24.06.2019
> @ 23:04:28 CST)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user:
> cloudera-scm
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current
> Hadoop/Kerberos user: root
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size:
> 406 MiBytes
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME:
> /usr/java/default
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version: 2.6.5
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments:
> (none)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath:
> 

Re: flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-21 Thread tison
20/01/22 11:08:49 INFO yarn.YarnResourceManager: Closing TaskExecutor
connection container_e10_1579661300080_0005_01_02 because: The
heartbeat of TaskManager with id container_e10_1579661300080_0005_01_02
timed out.

你请求资源的时候把 slot 请求发到这台机器上了,然后它心跳超时了,你看看 TM 有没有正常起来,有没有资源不够或者挂了

Best,
tison.


郑 洁锋  于2020年1月22日周三 上午11:16写道:

> 大家好,
>flink on yarn任务启动时,发现报错了The assigned slot
> container_e10_1579661300080_0005_01_02_0 was removed.
>环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241
>
> flink版本为1.8.1,yarn上的日志:
>
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:
> 
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting
> YarnJobClusterEntrypoint (Version: , Rev:7297bac, Date:24.06.2019
> @ 23:04:28 CST)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user:
> cloudera-scm
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current
> Hadoop/Kerberos user: root
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size:
> 406 MiBytes
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME:
> /usr/java/default
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version: 2.6.5
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments:
> (none)
> 20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath:
> 

Re: Custom Metrics outside RichFunctions

2020-01-21 Thread Yun Tang
Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could call 
function below to register your metrics group:

getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")


Best
Yun Tang

From: David Magalhães 
Sent: Tuesday, January 21, 2020 3:45
To: user 
Subject: Custom Metrics outside RichFunctions

Hi, I want to create a custom metric that shows the number of message that 
couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer.

Looking into Metrics page ( 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html 
) that doesn't seem to be possible, because it it's a RichFunction.

Anyone know another way to achieve this ?

Thanks,
David


flink on yarn任务启动报错 The assigned slot container_e10_1579661300080_0005_01_000002_0 was removed.

2020-01-21 Thread 郑 洁锋
大家好,
   flink on yarn任务启动时,发现报错了The assigned slot 
container_e10_1579661300080_0005_01_02_0 was removed.
   环境:flink1.8.1,cdh5.14.2,kafka0.10,jdk1.8.0_241

flink版本为1.8.1,yarn上的日志:

20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: 

20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Starting 
YarnJobClusterEntrypoint (Version: , Rev:7297bac, Date:24.06.2019 @ 
23:04:28 CST)
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  OS current user: 
cloudera-scm
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Current Hadoop/Kerberos 
user: root
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM: Java HotSpot(TM) 
64-Bit Server VM - Oracle Corporation - 1.8/25.241-b07
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Maximum heap size: 406 
MiBytes
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JAVA_HOME: 
/usr/java/default
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Hadoop version: 2.6.5
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  JVM Options:
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xms424m
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint: -Xmx424m
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Program Arguments: (none)
20/01/22 11:07:53 INFO entrypoint.ClusterEntrypoint:  Classpath: 

Re: Flink configuration on Docker deployment

2020-01-21 Thread Yang Wang
Hi Soheil,

Since you are not using any container orchestration framework(e.g.
docker-compose, Kubernetes,
mesos), so you need to manually update the flink-conf.yaml in your docker
images. Usually, it is
located in the path "/opt/flink/conf".
Docker volume also could be used to override the flink configuration when
you start the jobmanager
and taskmanager containers[1].

Best,
Yang

[1]. https://docs.docker.com/storage/volumes/

Soheil Pourbafrani  于2020年1月21日周二 下午7:46写道:

> Hi,
>
> I need to set up a Flink cluster using the docker(and not using the
> docker-compose). I successfully could strat the jobmanager and taskmanager
> but the problem is I have no idea how to change the default configuration
> for them. For example in the case of giving 8 slots to the taskmanager or
> change the memory size of both jobmanager and taskmanager.
> It will be appreciated if somebody tells me how to change the Flink
> parameters on docker
>
> Thanks
>


Re: Flink Job Submission Fails even though job is running

2020-01-21 Thread tison
I guess it is a jm internal error which crashes the dispatcher or race
condition so that the returning future never completed, possibly related to
jdk bug. But again, never have a log in the case I cannot conclude anything.

Best,
tison.


tison  于2020年1月22日周三 上午10:49写道:

> It is a known issue reported multiple times that if you are in an early
> jdk 1.8.x version, upgrade the bugfix version and the issue will vanish.
>
> I don't ever have a log on jm side when this issue reported so I'm sorry
> unable to explain more...
>
> Best,
> tison.
>
>
> Yang Wang  于2020年1月22日周三 上午10:46写道:
>
>> The "web.timeout" will be used for all web monitor asynchronous
>> operations, including the
>> "DispatcherGateway.submitJob" in the "JobSubmitHandler".
>> So when you increase the timeout, does it still could not work?
>>
>> Best,
>> Yang
>>
>> satya brat  于2020年1月21日周二 下午8:57写道:
>>
>>> How does web.timeout help hear?? The issue is with respect to aka
>>> dispatched timing out. The job is submitted to the task managers but the
>>> response doesn't reach the client.
>>>
>>> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang 
>>> wrote:
>>>
 Hi satya,

 Maybe the job has been submitted to Dispatcher successfully and the
 internal submitting job takes
 too long time(more than 10s). So it failed with timeout. Could you
 please set the `web.timeout: 3`
 and run again?



 Best,
 Yang

 satya brat  于2020年1月20日周一 下午4:34写道:

> I am using standalone cluster of Flink with 1 jobManager and n
> taskManagers. When I try to submit a job via command line, the job
> submission fails with error message as
> org.apache.flink.client.program.ProgramInvocationException: Could not
> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>
> On jobManager instance, everything works fine till the job is switched
> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
> following stacktrace
>
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#-177004106]] after [10 ms]. 
> Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:745)
>
> I went through the flink code on github and all the steps required to
> execute a job seems to be running fine. However, when jobManager has to
> give job submission ack to flink client that triggered the job, the
> jobSubmitHandler times out on the akka dispatcher that according to my
> understanding takes care of communicating with the job client.
>
> The Flink job consists for 1 Source (kafka), 2 operators and 1
> sink(Custom Sink). Following link shows the jobManager logs:
> https://pastebin.com/raw/3GaTtNrG
>
> Once the dispatcher times out, all other Flink UI calls also timeout
> with same exception.
>
> Following are the flink client logs that is used to submit job via
> command line.
>
> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> 
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Starting Command Line Client (Version: 1.8.0, 
> Rev:, Date:)
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  OS current user: root
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Current Hadoop/Kerberos user:  found>
> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle 
> Corporation - 1.8/25.5-b02
> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Maximum heap size: 2677 MiBytes
> 2019-09-28 

Re: Flink Job Submission Fails even though job is running

2020-01-21 Thread tison
It is a known issue reported multiple times that if you are in an early jdk
1.8.x version, upgrade the bugfix version and the issue will vanish.

I don't ever have a log on jm side when this issue reported so I'm sorry
unable to explain more...

Best,
tison.


Yang Wang  于2020年1月22日周三 上午10:46写道:

> The "web.timeout" will be used for all web monitor asynchronous
> operations, including the
> "DispatcherGateway.submitJob" in the "JobSubmitHandler".
> So when you increase the timeout, does it still could not work?
>
> Best,
> Yang
>
> satya brat  于2020年1月21日周二 下午8:57写道:
>
>> How does web.timeout help hear?? The issue is with respect to aka
>> dispatched timing out. The job is submitted to the task managers but the
>> response doesn't reach the client.
>>
>> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang  wrote:
>>
>>> Hi satya,
>>>
>>> Maybe the job has been submitted to Dispatcher successfully and the
>>> internal submitting job takes
>>> too long time(more than 10s). So it failed with timeout. Could you
>>> please set the `web.timeout: 3`
>>> and run again?
>>>
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> satya brat  于2020年1月20日周一 下午4:34写道:
>>>
 I am using standalone cluster of Flink with 1 jobManager and n
 taskManagers. When I try to submit a job via command line, the job
 submission fails with error message as
 org.apache.flink.client.program.ProgramInvocationException: Could not
 submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).

 On jobManager instance, everything works fine till the job is switched
 from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
 following stacktrace

 akka.pattern.AskTimeoutException: Ask timed out on 
 [Actor[akka://flink/user/dispatcher#-177004106]] after [10 ms]. 
 Sender[null] sent message of type 
 "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
 at 
 akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
 at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
 at 
 scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 at 
 scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
 at 
 scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
 at 
 akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
 at 
 akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
 at 
 akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
 at 
 akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
 at java.lang.Thread.run(Thread.java:745)

 I went through the flink code on github and all the steps required to
 execute a job seems to be running fine. However, when jobManager has to
 give job submission ack to flink client that triggered the job, the
 jobSubmitHandler times out on the akka dispatcher that according to my
 understanding takes care of communicating with the job client.

 The Flink job consists for 1 Source (kafka), 2 operators and 1
 sink(Custom Sink). Following link shows the jobManager logs:
 https://pastebin.com/raw/3GaTtNrG

 Once the dispatcher times out, all other Flink UI calls also timeout
 with same exception.

 Following are the flink client logs that is used to submit job via
 command line.

 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend  
  - 
 
 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend  
  -  Starting Command Line Client (Version: 1.8.0, 
 Rev:, Date:)
 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend  
  -  OS current user: root
 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend  
  -  Current Hadoop/Kerberos user: >>> found>
 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend  
  -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle 
 Corporation - 1.8/25.5-b02
 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend  
  -  Maximum heap size: 2677 MiBytes
 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend  
  -  JAVA_HOME: (not set)
 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend  
  -  No Hadoop Dependency available
 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend  
  -  JVM Options:
 2019-09-28 19:34:21,323 INFO  

Re: Replacing a server in Zookeeper Quorum

2020-01-21 Thread tison
Good to know :-)

Best,
tison.


Aaron Langford  于2020年1月22日周三 上午10:44写道:

> My apologies, I ended up resolving this through experimentation. AWS
> replaces master nodes with the same internal DNS names, so configurations
> need not be changed.
>
> Aaron
>
>
> On Tue, Jan 21, 2020, 6:33 PM Yang Wang  wrote:
>
>> Hi Aaron,
>>
>> I think it is not the responsibility of Flink. Flink uses zookeeper
>> curator to connect the zk server. If
>> multiple zk server are specified, it has an automatic retry mechanism.
>> However, your problem is ip address will change when the EMR instance
>> restarts. Currently, Flink
>> can not support dynamically loading configuration. One quick solution is
>> to use a static ip for EMR
>> master node[1].
>>
>>
>> Best,
>> Yang
>>
>>
>> [1].
>> https://aws.amazon.com/premiumsupport/knowledge-center/static-private-ip-master-node-emr/?nc1=h_ls
>>
>> Aaron Langford  于2020年1月22日周三 上午1:48写道:
>>
>>> Hello Flink Community,
>>>
>>> I'm working on a HA setup of Flink 1.8.1 on AWS EMR and have some
>>> questions about how Flink interacts with Zookeeper when one of the servers
>>> in the quorum specified in flink-conf.yaml goes down and is replaced by a
>>> machine with a new IP address.
>>>
>>> Currently, I configure high-availability.zookeeper.quorum to be the IP
>>> addresses of the 3 master nodes of the EMR cluster, as this is what AWS
>>> does to enable a highly available YARN setup.
>>>
>>> EMR master nodes may go down entirely and need to be replaced by a
>>> machine with a different instance IP address. I will almost certainly need
>>> to perform a rolling configuration update to account for this. But will I
>>> need to restart flink for this to take effect? Is there a way to
>>> dynamically reload these configs when they change?
>>>
>>> Aaron
>>>
>>


Re: Flink Job Submission Fails even though job is running

2020-01-21 Thread Yang Wang
The "web.timeout" will be used for all web monitor asynchronous operations,
including the
"DispatcherGateway.submitJob" in the "JobSubmitHandler".
So when you increase the timeout, does it still could not work?

Best,
Yang

satya brat  于2020年1月21日周二 下午8:57写道:

> How does web.timeout help hear?? The issue is with respect to aka
> dispatched timing out. The job is submitted to the task managers but the
> response doesn't reach the client.
>
> On Tue, Jan 21, 2020 at 12:34 PM Yang Wang  wrote:
>
>> Hi satya,
>>
>> Maybe the job has been submitted to Dispatcher successfully and the
>> internal submitting job takes
>> too long time(more than 10s). So it failed with timeout. Could you please
>> set the `web.timeout: 3`
>> and run again?
>>
>>
>>
>> Best,
>> Yang
>>
>> satya brat  于2020年1月20日周一 下午4:34写道:
>>
>>> I am using standalone cluster of Flink with 1 jobManager and n
>>> taskManagers. When I try to submit a job via command line, the job
>>> submission fails with error message as
>>> org.apache.flink.client.program.ProgramInvocationException: Could not
>>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>>>
>>> On jobManager instance, everything works fine till the job is switched
>>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
>>> following stacktrace
>>>
>>> akka.pattern.AskTimeoutException: Ask timed out on 
>>> [Actor[akka://flink/user/dispatcher#-177004106]] after [10 ms]. 
>>> Sender[null] sent message of type 
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>> at 
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>> at 
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>> at 
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at 
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>>> at 
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I went through the flink code on github and all the steps required to
>>> execute a job seems to be running fine. However, when jobManager has to
>>> give job submission ack to flink client that triggered the job, the
>>> jobSubmitHandler times out on the akka dispatcher that according to my
>>> understanding takes care of communicating with the job client.
>>>
>>> The Flink job consists for 1 Source (kafka), 2 operators and 1
>>> sink(Custom Sink). Following link shows the jobManager logs:
>>> https://pastebin.com/raw/3GaTtNrG
>>>
>>> Once the dispatcher times out, all other Flink UI calls also timeout
>>> with same exception.
>>>
>>> Following are the flink client logs that is used to submit job via
>>> command line.
>>>
>>> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend   
>>> - 
>>> 
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  Starting Command Line Client (Version: 1.8.0, 
>>> Rev:, Date:)
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  OS current user: root
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  Current Hadoop/Kerberos user: >> found>
>>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle 
>>> Corporation - 1.8/25.5-b02
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  Maximum heap size: 2677 MiBytes
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  JAVA_HOME: (not set)
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  No Hadoop Dependency available
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend   
>>> -  JVM Options:
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend   
>>> - 
>>> -Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
>>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend   
>>> - 
>>> 

Re: Replacing a server in Zookeeper Quorum

2020-01-21 Thread Aaron Langford
My apologies, I ended up resolving this through experimentation. AWS
replaces master nodes with the same internal DNS names, so configurations
need not be changed.

Aaron


On Tue, Jan 21, 2020, 6:33 PM Yang Wang  wrote:

> Hi Aaron,
>
> I think it is not the responsibility of Flink. Flink uses zookeeper
> curator to connect the zk server. If
> multiple zk server are specified, it has an automatic retry mechanism.
> However, your problem is ip address will change when the EMR instance
> restarts. Currently, Flink
> can not support dynamically loading configuration. One quick solution is
> to use a static ip for EMR
> master node[1].
>
>
> Best,
> Yang
>
>
> [1].
> https://aws.amazon.com/premiumsupport/knowledge-center/static-private-ip-master-node-emr/?nc1=h_ls
>
> Aaron Langford  于2020年1月22日周三 上午1:48写道:
>
>> Hello Flink Community,
>>
>> I'm working on a HA setup of Flink 1.8.1 on AWS EMR and have some
>> questions about how Flink interacts with Zookeeper when one of the servers
>> in the quorum specified in flink-conf.yaml goes down and is replaced by a
>> machine with a new IP address.
>>
>> Currently, I configure high-availability.zookeeper.quorum to be the IP
>> addresses of the 3 master nodes of the EMR cluster, as this is what AWS
>> does to enable a highly available YARN setup.
>>
>> EMR master nodes may go down entirely and need to be replaced by a
>> machine with a different instance IP address. I will almost certainly need
>> to perform a rolling configuration update to account for this. But will I
>> need to restart flink for this to take effect? Is there a way to
>> dynamically reload these configs when they change?
>>
>> Aaron
>>
>


Re: Replacing a server in Zookeeper Quorum

2020-01-21 Thread tison
I second Yang that it would be a workaround that you set a static ip for
EMR master node.

Even in ZooKeeper world reconfig is a new and immature feature since 3.5.3
while Flink uses ZooKeeper 3.4.x. It would be a breaking change if we "just"
upgrade zk version but hopefully the Flink community keep digging out a safe
upgrade path.

Best,
tison.


Yang Wang  于2020年1月22日周三 上午10:34写道:

> Hi Aaron,
>
> I think it is not the responsibility of Flink. Flink uses zookeeper
> curator to connect the zk server. If
> multiple zk server are specified, it has an automatic retry mechanism.
> However, your problem is ip address will change when the EMR instance
> restarts. Currently, Flink
> can not support dynamically loading configuration. One quick solution is
> to use a static ip for EMR
> master node[1].
>
>
> Best,
> Yang
>
>
> [1].
> https://aws.amazon.com/premiumsupport/knowledge-center/static-private-ip-master-node-emr/?nc1=h_ls
>
> Aaron Langford  于2020年1月22日周三 上午1:48写道:
>
>> Hello Flink Community,
>>
>> I'm working on a HA setup of Flink 1.8.1 on AWS EMR and have some
>> questions about how Flink interacts with Zookeeper when one of the servers
>> in the quorum specified in flink-conf.yaml goes down and is replaced by a
>> machine with a new IP address.
>>
>> Currently, I configure high-availability.zookeeper.quorum to be the IP
>> addresses of the 3 master nodes of the EMR cluster, as this is what AWS
>> does to enable a highly available YARN setup.
>>
>> EMR master nodes may go down entirely and need to be replaced by a
>> machine with a different instance IP address. I will almost certainly need
>> to perform a rolling configuration update to account for this. But will I
>> need to restart flink for this to take effect? Is there a way to
>> dynamically reload these configs when they change?
>>
>> Aaron
>>
>


Re: Replacing a server in Zookeeper Quorum

2020-01-21 Thread Yang Wang
Hi Aaron,

I think it is not the responsibility of Flink. Flink uses zookeeper curator
to connect the zk server. If
multiple zk server are specified, it has an automatic retry mechanism.
However, your problem is ip address will change when the EMR instance
restarts. Currently, Flink
can not support dynamically loading configuration. One quick solution is to
use a static ip for EMR
master node[1].


Best,
Yang


[1].
https://aws.amazon.com/premiumsupport/knowledge-center/static-private-ip-master-node-emr/?nc1=h_ls

Aaron Langford  于2020年1月22日周三 上午1:48写道:

> Hello Flink Community,
>
> I'm working on a HA setup of Flink 1.8.1 on AWS EMR and have some
> questions about how Flink interacts with Zookeeper when one of the servers
> in the quorum specified in flink-conf.yaml goes down and is replaced by a
> machine with a new IP address.
>
> Currently, I configure high-availability.zookeeper.quorum to be the IP
> addresses of the 3 master nodes of the EMR cluster, as this is what AWS
> does to enable a highly available YARN setup.
>
> EMR master nodes may go down entirely and need to be replaced by a machine
> with a different instance IP address. I will almost certainly need to
> perform a rolling configuration update to account for this. But will I need
> to restart flink for this to take effect? Is there a way to dynamically
> reload these configs when they change?
>
> Aaron
>


Re: java.lang.StackOverflowError

2020-01-21 Thread 刘建刚
I am using flink 1.6.2 on yarn. State backend is rocksdb. 

> 2020年1月22日 上午10:15,刘建刚  写道:
> 
>   I have a flink job which fails occasionally. I am eager to avoid this 
> problem. Can anyone help me? The error stacktrace is as following:
> java.io.IOException: java.lang.StackOverflowError
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:236)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.StackOverflowError
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.setError(InputChannel.java:203)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
> 

java.lang.StackOverflowError

2020-01-21 Thread 刘建刚
  I have a flink job which fails occasionally. I am eager to avoid this
problem. Can anyone help me? The error stacktrace is as following:

java.io.IOException: java.lang.StackOverflowError
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:236)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.setError(InputChannel.java:203)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-21 Thread Jark Wu
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both 
`>>`.
It is a 2-level map structure, where the `col1` is the join key, it is the 
first-level key of the state. The key of the MapState is the input row, 
and the `count` is the number of this row, the expiredTime indicates when 
to cleanup this row (avoid infinite state size). You can find the source code 
here[1].
In blink planner, the state structure will be more complex which is 
determined by the meta-information of upstream. You can see the source code of 
blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users 
should write the query result to an external system (like Mysql) and query the 
external system. 
Query on the intermediate state is on the roadmap, but I guess it is not in 
1.11 plan. 

Best,
Jark

[1]: 
http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
 

[2]: 
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
 



> 2020年1月21日 18:01,kant kodali  写道:
> 
> Hi All,
> 
> If I run a query like this
> 
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on 
> table1.col1 = table2.col1")
> 
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml 
> says state.backend = 'rocksdb'
> 
> 2) If the intermediate results are stored in rockdb then what is the key and 
> value in this case(given the query above)?
> 
> 3) What is the best way to query these intermediate results from an external 
> application? while the job is running and while the job is not running?
> 
> Thanks!



Re: Influxdb reporter not honouring the metrics scope

2020-01-21 Thread Chesnay Schepler
The solution for 1.9 and below is to create a customized version of the 
influx db reporter which excludes certain tags.


On 21/01/2020 19:27, Yun Tang wrote:

Hi, Gaurav

InfluxDB metrics reporter has a fixed format of reporting metrics 
which cannot be controlled by the scope.


If you don't want some tags stored, you can try to use 
`|metrics.reporter..scope.variables.excludes|` which introduced 
in flink-1.10 [1], to exclude specific variables. However, there 
exists no good solution for Flink-1.9 currently.



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#reporter

Best
Yun Tang

*From:* Gaurav Singhania 
*Sent:* Monday, January 20, 2020 13:04
*To:* user@flink.apache.org 
*Subject:* Influxdb reporter not honouring the metrics scope
Hi,
We are using influxdb reporter for flink 1.9 to capture our metrics. 
We want to override the scope of task metrics, however even after 
providing the config in yaml file the metrics continues to have the 
tags we don't want.


The metric scope we want to change is :
*metrics.scope.task *with a default configuration of 
".taskmanager"

We tried following configuration and none of them worked
".taskmanager..."
".taskmanager...constant_value."

None of them worked and task_name continues to be part of the tags of 
the measurement sent by influxdb reporter.


Thanks,
Gaurav Singhania





Re: Influxdb reporter not honouring the metrics scope

2020-01-21 Thread Yun Tang
Hi, Gaurav

InfluxDB metrics reporter has a fixed format of reporting metrics which cannot 
be controlled by the scope.

If you don't want some tags stored, you can try to use 
`metrics.reporter..scope.variables.excludes` which introduced in 
flink-1.10 [1], to exclude specific variables. However, there exists no good 
solution for Flink-1.9 currently.


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#reporter
Best
Yun Tang

From: Gaurav Singhania 
Sent: Monday, January 20, 2020 13:04
To: user@flink.apache.org 
Subject: Influxdb reporter not honouring the metrics scope

Hi,
We are using influxdb reporter for flink 1.9 to capture our metrics. We want to 
override the scope of task metrics, however even after providing the config in 
yaml file the metrics continues to have the tags we don't want.

The metric scope we want to change is :
metrics.scope.task with a default configuration of 
".taskmanager"
We tried following configuration and none of them worked
".taskmanager..."
".taskmanager...constant_value."

None of them worked and task_name continues to be part of the tags of the 
measurement sent by influxdb reporter.

Thanks,
Gaurav Singhania


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-21 Thread Aaron Langford
Senthil,

One of the key steps in debugging this for me was enabling debug level logs
on my cluster, and then looking at the logs in the resource manager. The
failure you are after happens before the exceptions you have reported here.
When your Flink application is starting, it will attempt to load various
file system implementations. You can see which ones it successfully loaded
when you have the debug level of logs configured. You will have to do some
digging, but this is a good place to start. Try to discover if your
application is indeed loading the s3 file system, or if that is not
happening. You should be able to find the file system implementations that
were loaded by searching for the string "Added file system".

Also, do you mind sharing the bootstrap action script that you are using to
get the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar  wrote:

> Yang, I appreciate your help! Please let me know if I can provide with any
> other info.
>
>
>
> I resubmitted my executable jar file as a step to the flink EMR and here’s
> are all the  exceptions. I see two of them.
>
>
>
> I fished them out of /var/log/Hadoop//syslog
>
>
>
> 2020-01-21 16:31:37,587 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom
> File Source -> Sink: Unnamed (11/16)): Error during di
>
> sposal of stream operator.
>
> java.lang.NullPointerException
>
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> 2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task
> (Split Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader:
> Custom File Source -> Sink: Unnamed (8/16)
> (865a5a078c3e40cbe2583afeaa0c601e) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS and for Hadoop version 2.7 or newer
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
> *From: *Yang Wang 
> *Date: *Saturday, January 18, 2020 at 7:58 PM
> *To: *Senthil Kumar 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)
>
>
>
> I think this exception is not because the hadoop version isn't high
> enough.
>
> It seems that the "s3" URI scheme could not be recognized by
> `S3FileSystemFactory`. So it fallbacks to
>
> the `HadoopFsFactory`.
>
>
>
> Could you share the debug level jobmanager/taskmanger logs so that we
> could confirm whether the
>
> classpath and FileSystem are loaded correctly.
>
>
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> Senthil Kumar  于2020年1月17日周五 下午10:57写道:
>
> Hello all,
>
>
>
> Newbie here!
>
>
>
> We are running in Amazon EMR with the following installed in the EMR
> Software Configuration
>
> Hadoop 2.8.5
>
> JupyterHub 1.0.0
>
> Ganglia 3.7.2
>
> Hive 2.3.6
>
> Flink 1.9.0
>
>
>
> I am trying to get a 

Replacing a server in Zookeeper Quorum

2020-01-21 Thread Aaron Langford
Hello Flink Community,

I'm working on a HA setup of Flink 1.8.1 on AWS EMR and have some questions
about how Flink interacts with Zookeeper when one of the servers in the
quorum specified in flink-conf.yaml goes down and is replaced by a machine
with a new IP address.

Currently, I configure high-availability.zookeeper.quorum to be the IP
addresses of the 3 master nodes of the EMR cluster, as this is what AWS
does to enable a highly available YARN setup.

EMR master nodes may go down entirely and need to be replaced by a machine
with a different instance IP address. I will almost certainly need to
perform a rolling configuration update to account for this. But will I need
to restart flink for this to take effect? Is there a way to dynamically
reload these configs when they change?

Aaron


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-21 Thread Senthil Kumar
Yang, I appreciate your help! Please let me know if I can provide with any 
other info.

I resubmitted my executable jar file as a step to the flink EMR and here’s are 
all the  exceptions. I see two of them.

I fished them out of /var/log/Hadoop//syslog


2020-01-21 16:31:37,587 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File 
Source -> Sink: Unnamed (11/16)): Error during di

sposal of stream operator.

java.lang.NullPointerException

at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)



2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split 
Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File 
Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from 
RUNNING to FAILED.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer

at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)

at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)


From: Yang Wang 
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar 
Cc: "user@flink.apache.org" 
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized by 
`S3FileSystemFactory`. So it fallbacks to
the `HadoopFsFactory`.

Could you share the debug level jobmanager/taskmanger logs so that we could 
confirm whether the
classpath and FileSystem are loaded correctly.



Best,
Yang

Senthil Kumar mailto:senthi...@vmware.com>> 于2020年1月17日周五 
下午10:57写道:

Hello all,



Newbie here!



We are running in Amazon EMR with the following installed in the EMR Software 
Configuration

Hadoop 2.8.5

JupyterHub 1.0.0

Ganglia 3.7.2

Hive 2.3.6

Flink 1.9.0



I am trying to get a Streaming job from one S3 bucket into an another S3 bucket 
using the StreamingFileSink



I got the infamous exception:

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on 
Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer



According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in 
/usr/lib/flink/lib

https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov



That did not work.



Further googling, revealed for Flink 1.9.0 and above:  (according to this)


Re: How to get Task metrics with StatsD metric reporter?

2020-01-21 Thread John Smith
I think I figured it out. I used netcat to debug. I think the Telegraf
StatsD server doesn't support spaces in the stats names.

On Mon, 20 Jan 2020 at 12:19, John Smith  wrote:

> Hi, running Flink 1.8
>
> I'm declaring my metric as such.
>
> invalidList = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("MyMetrics")
>   .meter("invalidList", new DropwizardMeterWrapper(new 
> com.codahale.metrics.Meter()));
>
> Then in my code I call.
>
> invalidList.markEvent();
>
>
> On the task nodes I enabled the Influx Telegraf StatsD server. And I
> enabled the task node with.
>
> metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
> metrics.reporter.stsd.host: localhost
> metrics.reporter.stsd.port: 8125
>
> The metrics are being pushed to Elasticsearch. So far I only see the
> Status_JVM_* metrics.
>
> Do the task specific metrics come from the Job nodes? I have not enabled
> reporting on the Job nodes yet.
>
>
>
>
>
>
>
>


Re: Question regarding checkpoint/savepoint and State Processor API

2020-01-21 Thread Jin Yi
Hi Seth,

Thanks for the prompt response! Regarding my second question, once I have
converted the existing savepoint to dataset, how can I convert the dataset
into BroadcastState?

For example, in my BroadcastProcessFunction:

@Override
public void processBroadcastElement(String key, Context context,
Collector collector) throws Exception {

// Todo how to add existing BroadcastState from savepoint beforehand?
  BroadcastState broadcastState =
context.getBroadcastState(keySetStateDescriptor);
  broadcastState.put(key, key);
}


Thanks a lot!
Eleanore

On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman  wrote:

> Hi Eleanore,
>
> Bootstrap data is not required to come from an existing savepoint. It can
> come from any DataSet which could be backed by a file, database, or any
> other system. The state processor api is also not a tool you are going to
> use between every start and stop of your job. It is just to bootstrap the
> initial state of your application. After that, you will use savepoints to
> carry over the current state of your applications between runs.
>
>
>
> On Mon, Jan 20, 2020 at 6:07 PM Jin Yi  wrote:
>
>> Hi there,
>>
>> 1. in my job, I have a broadcast stream, initially there is no savepoint
>> can be used as bootstrap values for the broadcast stream states.
>>   BootstrapTransformation transform =
>> OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
>>
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>> .withOperator(OPERATOR_UID, transform)
>> .write("file:///tmp/new_savepoints");*/
>>
>> Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
>> from the old savepoint, in this case, I dont have one, how should I deal
>> with it? Or it is must required?
>>
>> 2. As messages coming through broadcast stream, the state gets updated
>>
>> 3. I would like to periodically save the broadcast state to a file via
>> savepoints
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>> .withOperator(OPERATOR_UID, transform)
>> .write("file:///tmp/new_savepoints");
>>
>> 4. when the job gets cancelled, and next time when re-start the job, the
>> broadcast initial state can be loaded from the previous savepoint.
>>
>> ExistingSavepoint existingSavepoint = Savepoint.load(environment, 
>> "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new 
>> MemoryStateBackend());
>>
>> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, 
>> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>>
>> Question: now assume I got the old state as dataSet, how can I use it in the 
>> BroadcastProcessFunction as the initial state of the broadcast state?
>>
>> Thanks a lot for the help!
>>
>> Eleanore
>>
>
>
> --
>
> Seth Wiesman | Solutions Architect
>
> +1 314 387 1463
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: Question regarding checkpoint/savepoint and State Processor API

2020-01-21 Thread Jin Yi
Hi Seth,

Thanks for the prompt response! Regarding my second question, once I have
converted the existing savepoint to dataset, how can I convert the dataset
into BroadcastState?

For example, in my BroadcastProcessFunction:

@Override
public void processBroadcastElement(String key, Context context,
Collector collector) throws Exception {

// Todo how to add existing BroadcastState from savepoint beforehand?
  BroadcastState broadcastState =
context.getBroadcastState(keySetStateDescriptor);
  broadcastState.put(key, key);
}


Thanks a lot!
Eleanore

On Tue, Jan 21, 2020 at 7:12 AM Seth Wiesman  wrote:

> Hi Eleanore,
>
> Bootstrap data is not required to come from an existing savepoint. It can
> come from any DataSet which could be backed by a file, database, or any
> other system. The state processor api is also not a tool you are going to
> use between every start and stop of your job. It is just to bootstrap the
> initial state of your application. After that, you will use savepoints to
> carry over the current state of your applications between runs.
>
>
>
> On Mon, Jan 20, 2020 at 6:07 PM Jin Yi  wrote:
>
>> Hi there,
>>
>> 1. in my job, I have a broadcast stream, initially there is no savepoint
>> can be used as bootstrap values for the broadcast stream states.
>>   BootstrapTransformation transform =
>> OperatorTransformation.bootstrapWith(dataSet).transform(bootstrapFunction);
>>
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>> .withOperator(OPERATOR_UID, transform)
>> .write("file:///tmp/new_savepoints");*/
>>
>> Question: bootstrapWith(dataSet) is required, normally, the dataSet comes
>> from the old savepoint, in this case, I dont have one, how should I deal
>> with it? Or it is must required?
>>
>> 2. As messages coming through broadcast stream, the state gets updated
>>
>> 3. I would like to periodically save the broadcast state to a file via
>> savepoints
>> Savepoint.create(new MemoryStateBackend(), BROADCAST_PARALLELISM)
>> .withOperator(OPERATOR_UID, transform)
>> .write("file:///tmp/new_savepoints");
>>
>> 4. when the job gets cancelled, and next time when re-start the job, the
>> broadcast initial state can be loaded from the previous savepoint.
>>
>> ExistingSavepoint existingSavepoint = Savepoint.load(environment, 
>> "file:///tmp/smarts/checkpoints/85b69cb38897b9ac66a925fee4ecea2c/chk-5", new 
>> MemoryStateBackend());
>>
>> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, OPERATOR_NAME, 
>> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
>>
>> Question: now assume I got the old state as dataSet, how can I use it in the 
>> BroadcastProcessFunction as the initial state of the broadcast state?
>>
>> Thanks a lot for the help!
>>
>> Eleanore
>>
>
>
> --
>
> Seth Wiesman | Solutions Architect
>
> +1 314 387 1463
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Call for presentations for ApacheCon North America 2020 now open

2020-01-21 Thread Rich Bowen

Dear Apache enthusiast,

(You’re receiving this message because you are subscribed to one or more 
project mailing lists at the Apache Software Foundation.)


The call for presentations for ApacheCon North America 2020 is now open 
at https://apachecon.com/acna2020/cfp


ApacheCon will be held at the Sheraton, New Orleans, September 28th 
through October 2nd, 2020.


As in past years, ApacheCon will feature tracks focusing on the various 
technologies within the Apache ecosystem, and so the call for 
presentations will ask you to select one of those tracks, or “General” 
if the content falls outside of one of our already-organized tracks. 
These tracks are:


Karaf
Internet of Things
Fineract
Community
Content Delivery
Solr/Lucene (Search)
Gobblin/Big Data Integration
Ignite
Observability
Cloudstack
Geospatial
Graph
Camel/Integration
Flagon
Tomcat
Cassandra
Groovy
Web/httpd
General/Other

The CFP will close Friday, May 1, 2020 8:00 AM (America/New_York time).

Submit early, submit often, at https://apachecon.com/acna2020/cfp

Rich, for the ApacheCon Planners


GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-21 Thread Mark Harris
Hi,

We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop v 
"Amazon 2.8.5". We've recently noticed that some TaskManagers fail (causing all 
the jobs running on them to fail) with an "java.lang.OutOfMemoryError: GC 
overhead limit exceeded”. The taskmanager (and jobs that should be running on 
it) remain down until manually restarted.

I managed to take and analyze a memory dump from one of the afflicted 
taskmanagers.

It showed that 85% of the heap was made up of the 
java.io.DeleteOnExitHook.files hashset. The majority of the strings in that 
hashset (9041060 out of ~9041100) pointed to files that began 
/tmp/hadoop-yarn/s3a/s3ablock

The problem seems to affect jobs that make use of the StreamingFileSink - all 
of the taskmanager crashes have been on the taskmaster running at least one job 
using this sink, and a cluster running only a single taskmanager / job that 
uses the StreamingFileSink crashed with the GC overhead limit exceeded error.

I've had a look for advice on handling this error more broadly without luck.

Any suggestions or advice gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employees make 
no representation or accept any liability for its accuracy or completeness 
unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: 
https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas 
Social Housing Limited (company no: 01026007), British Gas Trading Limited 
(company no: 03078711), British Gas Services Limited (company no: 3141243), 
British Gas Insurance Limited (company no: 06608316), British Gas New Heating 
Limited (company no: 06723244), British Gas Services (Commercial) Limited 
(company no: 07385984) and Centrica Energy (Trading) Limited (company no: 
02877397) are all wholly owned subsidiaries of Centrica plc (company no: 
3033654). Each company is registered in England and Wales with a registered 
office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation 
Authority and regulated by the Financial Conduct Authority and the Prudential 
Regulation Authority. British Gas Services Limited and Centrica Energy 
(Trading) Limited are authorised and regulated by the Financial Conduct 
Authority. British Gas Trading Limited is an appointed representative of 
British Gas Services Limited which is authorised and regulated by the Financial 
Conduct Authority.


Re: Flink Performance

2020-01-21 Thread Dharani Sudharsan
Thanks David.

But I don’t see any solutions provided for the same.

On Jan 21, 2020, at 7:13 PM, David Magalhães 
mailto:speeddra...@gmail.com>> wrote:

I've found this ( 
https://stackoverflow.com/questions/50580756/flink-window-dragged-stream-performance
 ) post on StackOverflow, where someone complains about performance drop in 
KeyBy.

On Tue, Jan 21, 2020 at 1:24 PM Dharani Sudharsan 
mailto:dharani.sudhar...@outlook.in>> wrote:
Hi All,

Currently, I’m running a flink streaming application, the configuration below.

Task slots: 45
Task Managers: 3
Job Manager: 1
Cpu : 20 per machine

My sample code below:

Process Stream:  datastream.flatmap().map().process().addsink

Data size: 330GB approx.

Raw Stream: datastream.keyby.window.addsink

When I run the raw stream, Kafka source is reading data in GB and it is able to 
read 330GB in 15m.

But when I run the Process stream, there is a back pressure noticed and source 
is reading data in MBs and there is a huge impact on the performance.

I’m using file state backend with checkpointing enabled.

I tried debugging the issues. I made some changes to the code like below.

Datastream.keyby.timewindow.reduce.flatmap.keyby.timewindow.reduce.map.keyby.process.addsink

This time, the performance was slightly improved but not good and I noticed 
memory leaks which causing Task managers to go down and job is getting 
terminated.


Any help would be much appreciated.

Thanks,
Dharani.








Re: Flink Performance

2020-01-21 Thread David Magalhães
I've found this (
https://stackoverflow.com/questions/50580756/flink-window-dragged-stream-performance
)
post on StackOverflow, where someone complains about performance drop in
KeyBy.

On Tue, Jan 21, 2020 at 1:24 PM Dharani Sudharsan <
dharani.sudhar...@outlook.in> wrote:

> Hi All,
>
> Currently, I’m running a flink streaming application, the configuration
> below.
>
> Task slots: 45
> Task Managers: 3
> Job Manager: 1
> Cpu : 20 per machine
>
> My sample code below:
>
> Process Stream:  datastream.flatmap().map().process().addsink
>
> Data size: 330GB approx.
>
> Raw Stream: datastream.keyby.window.addsink
>
> When I run the raw stream, Kafka source is reading data in GB and it is
> able to read 330GB in 15m.
>
> But when I run the Process stream, there is a back pressure noticed and
> source is reading data in MBs and there is a huge impact on the
> performance.
>
> I’m using file state backend with checkpointing enabled.
>
> I tried debugging the issues. I made some changes to the code like below.
>
>
> Datastream.keyby.timewindow.reduce.flatmap.keyby.timewindow.reduce.map.keyby.process.addsink
>
> This time, the performance was slightly improved but not good and I
> noticed memory leaks which causing Task managers to go down and job is
> getting terminated.
>
>
> Any help would be much appreciated.
>
> Thanks,
> Dharani.
>
>
>
>
>
>


Flink Performance

2020-01-21 Thread Dharani Sudharsan
Hi All,

Currently, I’m running a flink streaming application, the configuration below.

Task slots: 45
Task Managers: 3
Job Manager: 1
Cpu : 20 per machine

My sample code below:

Process Stream:  datastream.flatmap().map().process().addsink

Data size: 330GB approx.

Raw Stream: datastream.keyby.window.addsink

When I run the raw stream, Kafka source is reading data in GB and it is able to 
read 330GB in 15m. 

But when I run the Process stream, there is a back pressure noticed and source 
is reading data in MBs and there is a huge impact on the performance. 

I’m using file state backend with checkpointing enabled.

I tried debugging the issues. I made some changes to the code like below.

Datastream.keyby.timewindow.reduce.flatmap.keyby.timewindow.reduce.map.keyby.process.addsink

This time, the performance was slightly improved but not good and I noticed 
memory leaks which causing Task managers to go down and job is getting 
terminated.


Any help would be much appreciated.

Thanks,
Dharani.







Re: Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 Thread tison
你读一下 EvictingWindowOperator 相关代码或者说 Evictor#evictBefore 的调用链,里面关于 window
state 的处理是比较 hack 的,用文字说也起不到简练的作用

private void emitWindowContents(W window, Iterable>
contents, ListState> windowState) throws Exception {
   timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());

   // Work around type system restrictions...
   FluentIterable> recordsWithTimestamp = FluentIterable
  .from(contents)
  .transform(new Function, TimestampedValue>() {
 @Override
 public TimestampedValue apply(StreamRecord input) {
return TimestampedValue.from(input);
 }
  });
   evictorContext.evictBefore(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));

   FluentIterable projectedContents = recordsWithTimestamp
  .transform(new Function, IN>() {
 @Override
 public IN apply(TimestampedValue input) {
return input.getValue();
 }
  });

   processContext.window = triggerContext.window;
   userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
   evictorContext.evictAfter(recordsWithTimestamp,
Iterables.size(recordsWithTimestamp));

   //work around to fix FLINK-4369, remove the evicted elements from
the windowState.
   //this is inefficient, but there is no other way to remove elements
from ListState, which is an AppendingState.
   windowState.clear();
   for (TimestampedValue record : recordsWithTimestamp) {
  windowState.add(record.getStreamRecord());
   }
}

Best,
tison.


USERNAME  于2020年1月21日周二 下午8:25写道:

> evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗?
>
>
>
>
>
>
> 在 2020-01-21 17:27:38,"tison"  写道:
> >正好看到这一部分,还是有的,你考虑下滑动的计数窗口
> >
> >[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState
> >
> >Best,
> >tison.
> >
> >
> >USERNAME  于2020年1月21日周二 下午5:21写道:
> >
> >> 大家,新年快乐~
> >>
> >>
> >> [1] TriggerResult.FIRE_AND_PURGE
> >>
> >>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
> >> [2] CountEvictor
> >>
> >>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
> >>
> >>
>


Re: Flink Job Submission Fails even though job is running

2020-01-21 Thread satya brat
How does web.timeout help hear?? The issue is with respect to aka
dispatched timing out. The job is submitted to the task managers but the
response doesn't reach the client.

On Tue, Jan 21, 2020 at 12:34 PM Yang Wang  wrote:

> Hi satya,
>
> Maybe the job has been submitted to Dispatcher successfully and the
> internal submitting job takes
> too long time(more than 10s). So it failed with timeout. Could you please
> set the `web.timeout: 3`
> and run again?
>
>
>
> Best,
> Yang
>
> satya brat  于2020年1月20日周一 下午4:34写道:
>
>> I am using standalone cluster of Flink with 1 jobManager and n
>> taskManagers. When I try to submit a job via command line, the job
>> submission fails with error message as
>> org.apache.flink.client.program.ProgramInvocationException: Could not
>> submit job (JobID: f839aefee74aa4483ce8f8fd2e49b69e).
>>
>> On jobManager instance, everything works fine till the job is switched
>> from DEPLOYING to RUNNING. Post that, once akka-timeut expires, I see the
>> following stacktrace
>>
>> akka.pattern.AskTimeoutException: Ask timed out on 
>> [Actor[akka://flink/user/dispatcher#-177004106]] after [10 ms]. 
>> Sender[null] sent message of type 
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> at 
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>> at 
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>> at 
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at 
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>> at 
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>> at 
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>> at 
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>> at 
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I went through the flink code on github and all the steps required to
>> execute a job seems to be running fine. However, when jobManager has to
>> give job submission ack to flink client that triggered the job, the
>> jobSubmitHandler times out on the akka dispatcher that according to my
>> understanding takes care of communicating with the job client.
>>
>> The Flink job consists for 1 Source (kafka), 2 operators and 1
>> sink(Custom Sink). Following link shows the jobManager logs:
>> https://pastebin.com/raw/3GaTtNrG
>>
>> Once the dispatcher times out, all other Flink UI calls also timeout with
>> same exception.
>>
>> Following are the flink client logs that is used to submit job via
>> command line.
>>
>> 2019-09-28 19:34:21,321 INFO  org.apache.flink.client.cli.CliFrontend
>>- 
>> 
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
>>-  Starting Command Line Client (Version: 1.8.0, 
>> Rev:, Date:)
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
>>-  OS current user: root
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
>>-  Current Hadoop/Kerberos user: 
>> 2019-09-28 19:34:21,322 INFO  org.apache.flink.client.cli.CliFrontend
>>-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle 
>> Corporation - 1.8/25.5-b02
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
>>-  Maximum heap size: 2677 MiBytes
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
>>-  JAVA_HOME: (not set)
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
>>-  No Hadoop Dependency available
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
>>-  JVM Options:
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
>>- 
>> -Dlog.file=/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/log/flink-root-client-fulfillment-stream-processor-flink-task-manager-2-8047357.log
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
>>- 
>> -Dlog4j.configuration=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/log4j-cli.properties
>> 2019-09-28 19:34:21,323 INFO  org.apache.flink.client.cli.CliFrontend
>>- 
>> -Dlogback.configurationFile=file:/var/lib/fulfillment-stream-processor/flink-executables/flink-executables/conf/logback.xml
>> 2019-09-28 19:34:21,323 INFO  

Re:Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 Thread USERNAME
evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗?






在 2020-01-21 17:27:38,"tison"  写道:
>正好看到这一部分,还是有的,你考虑下滑动的计数窗口
>
>[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState
>
>Best,
>tison.
>
>
>USERNAME  于2020年1月21日周二 下午5:21写道:
>
>> 大家,新年快乐~
>>
>>
>> [1] TriggerResult.FIRE_AND_PURGE
>>
>> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
>> [2] CountEvictor
>>
>> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>>
>>


Flink configuration on Docker deployment

2020-01-21 Thread Soheil Pourbafrani
Hi,

I need to set up a Flink cluster using the docker(and not using the
docker-compose). I successfully could strat the jobmanager and taskmanager
but the problem is I have no idea how to change the default configuration
for them. For example in the case of giving 8 slots to the taskmanager or
change the memory size of both jobmanager and taskmanager.
It will be appreciated if somebody tells me how to change the Flink
parameters on docker

Thanks


Re: Implementing a tick service

2020-01-21 Thread Dominik Wosiński
Hey,
you have access to context in `onTimer` so You can easily reschedule the
timer when it is fired.

Best,
Dom.


Re: Implementing a tick service

2020-01-21 Thread Benoît Paris
Hello all!

Please disregard the last message; I used Thread.sleep() and Stateful
Source Functions

.

But just out of curiosity, can processing-time Timers get rescheduled
inside the onTimer method?






On Mon, Jan 20, 2020 at 7:04 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all :)!
>
> I'm having trouble creating a tick service.
>
> Goal: register a TableSource that emits a Row roughly every 200ms in
> processing time. The Row would contain only one column "counter" that is
> incremented by 1 each Row.
>
> Current attempt: Using TimerService
> A TableSource with
>
> public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
> return execEnv
> .fromElements((Long) offset) // default 0L, one element
> .keyBy(new NullByteKeySelector<>())
> .process(new TickKeyedProcessFunction(200L))
> .forceNonParallel();
> }
>
> And a KeyedProcessFunction with onTimer doing the heavy-lifting:
>
> public void processElement(Long value, Context context, Collector 
> collector) throws IOException {
> // called once
> counter.update(value);
> Long now = System.currentTimeMillis();
> context.timerService().registerProcessingTimeTimer(now);
> }
>
>  public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
> throws Exception {
> Long then = timestamp + interval;
> Long current = counter.value();
> current++;
> counter.update(current);
> ctx.timerService().registerProcessingTimeTimer(then);
> out.collect(current);
> }
>
> Now, the runtime tells me the Source is in FINISHED status. So obviously
> there must be limitations around re-scheduling one key inside onTimer.
>
> Is there a way to use the TimerService to go around that?
> Also, how would you implement this tick service by other means?
>
> Cheers
> Ben
>
>
>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


[no subject]

2020-01-21 Thread Ankush Khanna




where does flink store the intermediate results of a join and what is the key?

2020-01-21 Thread kant kodali
Hi All,

If I run a query like this

StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
table1.col1 = table2.col1")

1) Where will flink store the intermediate result? Imagine flink-conf.yaml
says state.backend = 'rocksdb'

2) If the intermediate results are stored in rockdb then what is the key
and value in this case(given the query above)?

3) What is the best way to query these intermediate results from an
external application? while the job is running and while the job is not
running?

Thanks!


Re: Questions of "State Processing API in Scala"

2020-01-21 Thread Izual
Sry for wrong post.

> This can probably be confirmed by looking at the exception stack trace.
> Can you post a full copy of that?
I missed the history jobs, but I think u r right.
When I debug the program to find reason, came into these code snippet.

```
  TypeSerializerSchemaCompatibility result =
previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer);
  if (result.isIncompatible()) {
invalidateCurrentSchemaSerializerAccess();
  }
```

I remember one is
`org.apache.flink.api.common.typeutils.base.LongSerializer$LongSerializerSnapshot`,
another is just `Kryo`.

> Can you open a JIRA for this? I think it'll be a reasonable extension to
> the API.
I'll do that, 3q.

> I'm not sure what you mean here. Where is this keyBy happening? In the
> Scala DataStream job, or the State Processor API?
In the Scala DataStream job, same with the examples of link-1 in the origial
post。
I change keyBy(_._1) to keyBy(0), then the program will throw an exception.

The full copy from job Exceptions:

```
java.io.IOException: Failed to restore state backend
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
8f89af64b0cf95ff20b8dda264c66f81_8f89af64b0cf95ff20b8dda264c66f81_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 7 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 9 more
Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:142)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 13 more
```

Maybe it's explainable with 「inferred and serialized as their Java
counterparts」, not sure, I am a triple beginner with Java & Scala & Flink.
Thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 Thread tison
正好看到这一部分,还是有的,你考虑下滑动的计数窗口

[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState

Best,
tison.


USERNAME  于2020年1月21日周二 下午5:21写道:

> 大家,新年快乐~
>
>
> [1] TriggerResult.FIRE_AND_PURGE
>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
> [2] CountEvictor
>
> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>
>


CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 Thread USERNAME
大家,新年快乐~


[1] TriggerResult.FIRE_AND_PURGE
https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
[2] CountEvictor
https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377



about registering completion function for worker shutdown

2020-01-21 Thread Dominique De Vito
Hi,

For a Flink batch job, some value are writing to Kafka through a Producer.

I want to register a hook for closing (at the end) the Kafka producer a
worker is using hook to be executed, of course, on worker side.

Is there a way to do so ?

Thanks.

Regards,
Dominique