Trying to eagerly schedule a task whose inputs are not ready

2020-07-15 Thread jiafu
flink
org.apache.flink.runtime.executiongraph.ExecutionGraphException: Trying to 
eagerly schedule a task whose inputs are not ready (result type: 
PIPELINED_BOUNDED, partition consumable: false, producer state: SCHEDULED, 
producer slot: null).at 
org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:145)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:840)
 at 
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:621)
 at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
 at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)  
 at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010)  at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:436)
   at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:637)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:229)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:186)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:96)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:146)
 at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
 at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:633)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.executiongraph.Execution.lambda$releaseAssignedResource$11(Execution.java:1350)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
  at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1345)
   at 
org.apache.flink.runtime.executiongraph.Execution.finishCancellation(Execution.java:1115)
at 
org.apache.flink.runtime.executiongraph.Execution.completeCancelling(Execution.java:1094)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1628)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:517)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) 
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)  
 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)  at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234)at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 Thread Harold.Miao
是在flink-conf.yaml里面配置这个参数吗
execution.checkpointing.interval


godfrey he  于2020年7月16日周四 下午1:37写道:

> 现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
> 你可以配置在flink-conf.yaml里
>
> Harold.Miao  于2020年7月16日周四 下午1:27写道:
>
> > hi flink users
> >
> > 通过sql-client提交sql怎么设置checkpointing.interval?
> > 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> > 谢谢
> >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 Thread godfrey he
现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
你可以配置在flink-conf.yaml里

Harold.Miao  于2020年7月16日周四 下午1:27写道:

> hi flink users
>
> 通过sql-client提交sql怎么设置checkpointing.interval?
> 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> 谢谢
>
>
>
> --
>
> Best Regards,
> Harold Miao
>


[sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 Thread Harold.Miao
hi flink users

通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢



-- 

Best Regards,
Harold Miao


Re: Flink 1.11 submit job timed out

2020-07-15 Thread Congxian Qiu
Hi
   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在 Yarn
环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。

Best,
Congxian


SmileSmile  于2020年7月15日周三 下午5:20写道:

> Hi Roc
>
> 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> On 07/15/2020 17:16, Roc Marshal wrote:
> Hi,SmileSmile.
> 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> 希望这对你有帮助。
>
>
> 祝好。
> Roc Marshal
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> >
> >Hi
> >
> >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time
> out,作业提交失败。web ui也会卡主无响应。
> >
> >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> >
> >
> >部分日志如下:
> >
> >2020-07-15 16:58:46,460 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.32.160.7, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >2020-07-15 16:58:46,460 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.44.224.7, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >2020-07-15 16:58:46,461 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.40.32.9, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >
> >2020-07-15 16:59:10,236 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The
> heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
> >2020-07-15 16:59:10,236 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager 
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for job
> e1554c737e37ed79688a15c746b6e9ef from the resource manager.
> >
> >
> >how to deal with ?
> >
> >
> >beset !
> >
> >| |
> >a511955993
> >|
> >|
> >邮箱:a511955...@163.com
> >|
> >
> >签名由 网易邮箱大师 定制
>


Re: 回复:答复: flink state

2020-07-15 Thread Congxian Qiu
Hi
broadcast state 是无法修改的,如果你还希望进行修改的话,可以使用 zhao liang 的方法,另外如果这个全局 state
不需要维护一致性等的话,同样可以考虑放到外存中(Redis,HBase 等)

Best,
Congxian


zhao liang  于2020年7月15日周三 下午6:05写道:

> Broadcast
> state是无法满足你的要求的,估计你只能像我这样把涉及的state数据融入到数据流中,在算子中针对不同的类型数据做区分了,等于人工维持这个broadcast的流的变化。
>
> 发件人: Robert.Zhang <173603...@qq.com>
> 日期: 星期三, 2020年7月15日 15:22
> 收件人: user-zh , user-zh@flink.apache.org <
> user-zh@flink.apache.org>
> 主题: 回复:答复: flink state
> 是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed
> state把某些满足条件的key存入这个broadcast state,并在其他算子计算的时候使用这个broadcast
> state,比如需要这些key来做
> 文档中提到的nonbroadcast side是无法修改broadcast state的,是read-only,
> 似乎无法直接去实现
>
>
>
>
>
> --原始邮件--
> 发件人: "zhao liang" 发送时间: 2020年7月14日(星期二) 下午4:09
> 收件人: "user-zh" 主题: 答复: flink state
>
>
>
> 我这边有个类似的实现,需要根据维表数据改变stream的处理,自定义了一个source(从MySQL中定时刷维表数据),kafka的stream
> union这个维表数据流,
> 额外增加一个数据类型(维表类型或者事实数据)进行数据的处理,后续算子将这个维表进行不同的处理并存到对应算子的state中。
>
> 发件人: Congxian Qiu  日期: 星期二, 2020年7月14日 14:03
> 收件人: user-zh  主题: Re: flink state
> Hi Robert
>
> Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
> [2] https://cloud.tencent.com/developer/article/1509789
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com 于2020年7月13日周一 下午9:50写道:
>
>  Hello,all
>  目前stream中遇到一个问题,
>  想使用一个全局的state 在所有的keyed stream中使用,或者global
>  parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
>  operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
> 
> 
>  Best regards
>


Re: flink sql 1.11 create hive table error

2020-07-15 Thread Leonard Xu
Hello, Zach

是的, 1.12 会支持,PR[1]已经开了,在review中。

祝好,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18588 

> 在 2020年7月16日,12:07,Zhou Zach  写道:
> 
> Hi all,
> flink sql 1.11 create table 是不是 不支持 IF NOT EXISTS
> 
> 
> Query:
>val hiveConfDir = "/etc/hive/conf" 
>val hiveVersion = "2.1.1"
> 
>val odsCatalog = "odsCatalog"
>val odsHiveCatalog = new HiveCatalog(odsCatalog, "ods", hiveConfDir, 
> hiveVersion)
>streamTableEnv.registerCatalog(odsCatalog, odsHiveCatalog)
> 
>streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>streamTableEnv.executeSql(
>  """
>|
>|CREATE TABLE IF NOT EXISTS odsCatalog.ods.hive_table (
>|  user_id STRING,
>|  age INT
>|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet 
> TBLPROPERTIES (
>|  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>|  'sink.partition-commit.trigger'='partition-time',
>|  'sink.partition-commit.delay'='0s',
>|  'sink.partition-commit.policy.kind'='metastore'
>|)
>|
>|""".stripMargin)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
> ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_161]
>   at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_161]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [?:1.8.0_161]
>   at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>  [data-flow-1.0.jar:?]
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>  [data-flow-1.0.jar:?]
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [data-flow-1.0.jar:?]
> Caused by: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
>   ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error: SQL parse failed. Encountered "NOT" at line 3, 
> column 17.
> Was expecting one of:
> 
>"ROW" ...
>"COMMENT" ...
>"LOCATION" ...
>"PARTITIONED" ...
>"STORED" ...
>"TBLPROPERTIES" ...
>"(" ...
>"." ...
> 
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   ... 10 more
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "NOT" at line 3, column 17.
> Was expecting one of:
> 
>"ROW" ...
>"COMMENT" ...
>"LOCATION" ...
>"PARTITIONED" ...
>"STORED" ...
>"TBLPROPERTIES" ...
>"(" ...
>"." ...
> 
>   at 
> 

Re: flink 1.11 upsert结果出错

2020-07-15 Thread Leonard Xu


> 在 2020年7月16日,11:44,小学生 <201782...@qq.com> 写道:
> 
> t_env.execute_sql('''delete from source_tab where trck_id='aew' ''')

你这张表定义的是 Flink 中的表,这张表对应的是你外部系统(MySQL数据库)中的表,Flink 不支持 表上 的DELETE [1], Flink 
是一个计算引擎,
主要场景是读取、写入外部系统,修改外部系统的数据目前只发生在写入(insert)的时候,并且主要是为了保证数据一致性语义,需要往下游系统发Delete消息,
这个delete的消息的处理都是各个connector自己处理的,用户不用显示地调用delete, 你可以参考[2]了解更多。

祝好
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/
 

[2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
 

 

Re: Flink-1.11内置connector测试问题求解

2020-07-15 Thread godfrey he
目前 1.11 版本中的 tableResult.print 只支持 exactly once 语义,需要配置 checkpoint。

1.12 里准备支持 at least once 语义,用户可以不用配置 checkpoint。目前 pr [1] 正在reivew 。

[1] https://github.com/apache/flink/pull/12867

Best,
Godfrey

Jingsong Li  于2020年7月16日周四 上午11:36写道:

>  tableResult.print需要有checkpoint
>
> Best,
> Jingsong
>
> On Thu, Jul 16, 2020 at 11:31 AM amen...@163.com  wrote:
>
> > hi, everyone
> >
> > 小白在测试flink
> >
> 1.11新特性新内置的三个connector时,在本地创建图片[1]中的任务并进行数据打印时,控制台只打印了表schema,而没有按内置的datagen
> > connector规则产生数据,请问可能是什么原因呢?谢谢解答!
> >
> >
> > [1] https://postimg.cc/PprT9XV6
> >
> > best,
> > amenhub
> >
> >
> >
> > amen...@163.com
> >
>
>
> --
> Best, Jingsong Lee
>


flink sql 1.11 create hive table error

2020-07-15 Thread Zhou Zach
Hi all,
flink sql 1.11 create table 是不是 不支持 IF NOT EXISTS


Query:
val hiveConfDir = "/etc/hive/conf" 
val hiveVersion = "2.1.1"

val odsCatalog = "odsCatalog"
val odsHiveCatalog = new HiveCatalog(odsCatalog, "ods", hiveConfDir, 
hiveVersion)
streamTableEnv.registerCatalog(odsCatalog, odsHiveCatalog)

streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
  """
|
|CREATE TABLE IF NOT EXISTS odsCatalog.ods.hive_table (
|  user_id STRING,
|  age INT
|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet 
TBLPROPERTIES (
|  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
|  'sink.partition-commit.trigger'='partition-time',
|  'sink.partition-commit.delay'='0s',
|  'sink.partition-commit.policy.kind'='metastore'
|)
|
|""".stripMargin)












java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_161]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_161]
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
 [data-flow-1.0.jar:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [data-flow-1.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[data-flow-1.0.jar:?]
Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: SQL parse failed. Encountered "NOT" at line 3, column 
17.
Was expecting one of:
 
"ROW" ...
"COMMENT" ...
"LOCATION" ...
"PARTITIONED" ...
"STORED" ...
"TBLPROPERTIES" ...
"(" ...
"." ...

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Encountered "NOT" at line 3, column 17.
Was expecting one of:
 
"ROW" ...
"COMMENT" ...
"LOCATION" ...
"PARTITIONED" ...
"STORED" ...
"TBLPROPERTIES" ...
"(" ...
"." ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) 
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
 

Re: flink 1.11 upsert????????

2020-07-15 Thread ??????
??delete??
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE source_tab (
trck_id VARCHAR,
score INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
sink="""
CREATE TABLE sink_tab (
trck_id VARCHAR,
score INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g_copy', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)


t_env.execute_sql(source)
t_env.execute_sql(sink)


t_env.execute_sql('''delete from source_tab where trck_id='aew' ''')
table_result1=t_env.execute_sql('''insert into sink_tab select * from 
source_tab ''')
table_result1.get_job_client().get_job_execution_result().result()

FlinkKafkaConsumer API 维表关联

2020-07-15 Thread 郑斌斌
各位好:

请教一下,用FlinkKafkaConsumer API的话,如何支持SQL的方式,和维表关联。(之前用Kafka 
API API是可以的)
 "select  a.id,b.name from kafka_table a "
+ "join dim_table FOR SYSTEM_TIME AS OF a.proctime as b on a.id = 
b.user_id";

thanks & Regards

Re: flink 1.11 upsert????????

2020-07-15 Thread ??????
??pyflink??

Re: flink 1.11 upsert结果出错

2020-07-15 Thread Xingbo Huang
Hi,
Leonard 说的是对的,除了udf的部分,pyflink的所有的api都是调用的java端的功能,如果java端没有,pyflink就不支持

Best,
Xingbo

Leonard Xu  于2020年7月16日周四 上午11:09写道:

> Hi,
>
> 我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。
>
> 祝好
> Leonard Xu
>
> > 在 2020年7月16日,11:04,小学生 <201782...@qq.com> 写道:
> >
> > 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!
>
>


Re: flink 1.11 upsert结果出错

2020-07-15 Thread Leonard Xu
Hi,

我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。

祝好
Leonard Xu

> 在 2020年7月16日,11:04,小学生 <201782...@qq.com> 写道:
> 
> 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!



Re: flink 1.11 upsert????????

2020-07-15 Thread ??????
??java,pyflink??mysql??delete??

ElasticSearch_Sink

2020-07-15 Thread C DINESH
Hello All,

Can we implement 2 Phase Commit Protocol for elastic search sink. Will
there be any limitations?

Thanks in advance.

Warm regards,
Dinesh.


Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-15 Thread Leonard Xu
Hi, Jim

Could you post error message in text that contains the entire schema of query 
and sink? I doubt there are some  fields type were mismatched.

Best,
Leonard Xu

> 在 2020年7月16日,10:29,Jim Chen  写道:
> 
> Hi,
>   I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report 
> an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema 
> and Sink Schema are inconsistent.
>   Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema 
> is Row(device_id). I don't know how to write in sql to be consistent with 
> hbase's sink schema.
>   I try to write sql like select device_id as rowkey, ROW( device_id as 
> [cannot write as]  ) as f1
> 
> error message as follow:
> 
> 
> sample code like:
> HBase sink ddl:
> String ddlSource = "CREATE TABLE 
> test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
> "  rowkey STRING,\n" +
> "  f1 ROW< \n" +
> "device_id STRING,\n" +
> "pass_id STRING,\n" +
> "first_date STRING,\n" +
> "first_channel_id STRING,\n" +
> "first_app_version STRING,\n" +
> "first_server_time STRING,\n" +
> "first_server_hour STRING,\n" +
> "first_ip_location STRING,\n" +
> "first_login_time STRING,\n" +
> "sys_can_uninstall STRING,\n" +
> "update_date STRING,\n" +
> "server_time BIGINT,\n" +
> "last_pass_id STRING,\n" +
> "last_channel_id STRING,\n" +
> "last_app_version STRING,\n" +
> "last_date STRING,\n" +
> "os STRING,\n" +
> "attribution_channel_id STRING,\n" +
> "attribution_first_date STRING,\n" +
> "p_product STRING,\n" +
> "p_project STRING,\n" +
> "p_dt STRING\n" +
> ">\n" +
> ") WITH (\n" +
> "  'connector.type' = 'hbase',\n" +
> "  'connector.version' = '1.4.3',\n" + // 
> 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
> "  'connector.table-name' = 
> 'dw_common_mobile_device_user_mapping_new',\n" +
> "  'connector.zookeeper.quorum' = '"+ zookeeperServers 
> +"',\n" +
> "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
> "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
> "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
> "  'connector.write.buffer-flush.interval' = '2s'\n" +
> ")";
> 
> insert into sql:
> 
> String bodyAndLocalSql = "" +
> //"insert into 
> test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
> "SELECT CAST(rowkey AS STRING) AS rowkey, " +
> " ROW(" +
> " device_id, pass_id, first_date, first_channel_id, 
> first_app_version, first_server_time, first_server_hour, first_ip_location, 
> first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, 
> last_channel_id, last_app_version, last_date, os, attribution_channel_id, 
> attribution_first_date, p_product, p_project, p_dt " +
> ") AS f1" +
> " FROM " +
> "(" +
> " SELECT " +
> " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, 
> kafka.p_project)) AS rowkey, " +
> " kafka.uid AS device_id " +
> ",kafka.pass_id " +
> 
> // first_date
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
> // 老用户
> " ELSE hbase.first_date END AS first_date " +
> 
> // first_channel_id
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN kafka.wlb_channel_id" +
> // 老用户
> " ELSE hbase.first_channel_id END AS first_channel_id " +
> 
> // first_app_version
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN kafka.app_version " +
> // 老用户
> " ELSE hbase.first_app_version END AS first_app_version " +
> 
> // first_server_time
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd 
> HH:mm:ss') " +
> // 老用户
>

Re: Print table content in Flink 1.11

2020-07-15 Thread Jingsong Li
Hi Flavio,

For print:
- As Kurt said, you can use `table.execute().print();`, records will be
collected to the client (NOTE it is client) and print to client console.
- But if you want print records in runtime tasks like DataStream.print, you
can use [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/print.html

Best,
Jingsong

On Thu, Jul 16, 2020 at 10:18 AM Kurt Young  wrote:

> Hi Flavio,
>
> In 1.11 we have provided an easier way to print table content, after you
> got the `table` object,
> all you need to to is calling `table.execute().print();`
>
> Best,
> Kurt
>
>
> On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu  wrote:
>
>> Hi, Flavio
>>
>>
>> 在 2020年7月16日,00:19,Flavio Pompermaier  写道:
>>
>> final JobExecutionResult jobRes = tableEnv.execute("test-job");
>>
>>
>> In Flink 1.11, once a Table has transformed to DataStream, only
>> StreamExecutionEnvironment can execute the DataStream program, please use
>> env.execute(“test-job”) in this case, you can get mote information from [1].
>>
>>
>> Best,
>> Leonard Xu
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>
>>

-- 
Best, Jingsong Lee


HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-15 Thread Jim Chen
Hi,
  I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase,
report an error like validateSchemaAndApplyImplicitCast. Means that the
Query Schema and Sink Schema are inconsistent.
  Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink
Schema is Row(device_id). I don't know how to write in sql to be consistent
with hbase's sink schema.
  I try to write sql like select device_id as rowkey, ROW( device_id as
[cannot write as]  ) as f1

error message as follow:
[image: image.png]

sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
"  rowkey STRING,\n" +
"  f1 ROW< \n" +
"device_id STRING,\n" +
"pass_id STRING,\n" +
"first_date STRING,\n" +
"first_channel_id STRING,\n" +
"first_app_version STRING,\n" +
"first_server_time STRING,\n" +
"first_server_hour STRING,\n" +
"first_ip_location STRING,\n" +
"first_login_time STRING,\n" +
"sys_can_uninstall STRING,\n" +
"update_date STRING,\n" +
"server_time BIGINT,\n" +
"last_pass_id STRING,\n" +
"last_channel_id STRING,\n" +
"last_app_version STRING,\n" +
"last_date STRING,\n" +
"os STRING,\n" +
"attribution_channel_id STRING,\n" +
"attribution_first_date STRING,\n" +
"p_product STRING,\n" +
"p_project STRING,\n" +
"p_dt STRING\n" +
">\n" +
") WITH (\n" +
"  'connector.type' = 'hbase',\n" +
"  'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
"  'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
"  'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
"  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
"  'connector.write.buffer-flush.max-size' = '2mb',\n" +
"  'connector.write.buffer-flush.max-rows' = '1000',\n" +
"  'connector.write.buffer-flush.interval' = '2s'\n" +
")";

insert into sql:

String bodyAndLocalSql = "" +
//"insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +

// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +

// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +

// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +

// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +

// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +

// first_ip_location

Re: Print table content in Flink 1.11

2020-07-15 Thread Kurt Young
Hi Flavio,

In 1.11 we have provided an easier way to print table content, after you
got the `table` object,
all you need to to is calling `table.execute().print();`

Best,
Kurt


On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu  wrote:

> Hi, Flavio
>
>
> 在 2020年7月16日,00:19,Flavio Pompermaier  写道:
>
> final JobExecutionResult jobRes = tableEnv.execute("test-job");
>
>
> In Flink 1.11, once a Table has transformed to DataStream, only
> StreamExecutionEnvironment can execute the DataStream program, please use
> env.execute(“test-job”) in this case, you can get mote information from [1].
>
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
>


HELP,flink1.10 sql整合hbase,insert into时,提示validateSchemaAndApplyImplicitCast报错

2020-07-15 Thread Jim Chen
Hi,

我在使用flink1.10.1的sql功能,hbase的版本是1.4.3,写入hbase时,提示validateSchemaAndApplyImplicitCast报错,意思是Query的Schema和Sink的Schema不一致。主要是Query
Schema中的Row(EXPR$0),里面都是表达式。Sink
Schema中是Row(device_id)这种。我不知道,在sql中如何写,才能和hbase的sink schema保持一致。

我尝试了,类似于在select device_id as rowkey, ROW( device_id as 这里不能as ) as
f1,不写的话,Query 中ROW的 Schema都是表达式,不是具体定义的一个字段

这里query和sink的字段个数,是对上的。每个字段的类型也是对应上的。就是Query的Schema中是表达式,没法保持一致

报错信息如下:
[image: image.png]

关键代码:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
"  rowkey STRING,\n" +
"  f1 ROW< \n" +
"device_id STRING,\n" +
"pass_id STRING,\n" +
"first_date STRING,\n" +
"first_channel_id STRING,\n" +
"first_app_version STRING,\n" +
"first_server_time STRING,\n" +
"first_server_hour STRING,\n" +
"first_ip_location STRING,\n" +
"first_login_time STRING,\n" +
"sys_can_uninstall STRING,\n" +
"update_date STRING,\n" +
"server_time BIGINT,\n" +
"last_pass_id STRING,\n" +
"last_channel_id STRING,\n" +
"last_app_version STRING,\n" +
"last_date STRING,\n" +
"os STRING,\n" +
"attribution_channel_id STRING,\n" +
"attribution_first_date STRING,\n" +
"p_product STRING,\n" +
"p_project STRING,\n" +
"p_dt STRING\n" +
">\n" +
") WITH (\n" +
"  'connector.type' = 'hbase',\n" +
"  'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
"  'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
"  'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
"  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
"  'connector.write.buffer-flush.max-size' = '2mb',\n" +
"  'connector.write.buffer-flush.max-rows' = '1000',\n" +
"  'connector.write.buffer-flush.interval' = '2s'\n" +
")";

insert into sql:

String bodyAndLocalSql = "" +
//"insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +

// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +

// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +

// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +

// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +

// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +

// first_ip_location
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
 

Re:Re: flink 1.11 sql类型问题

2020-07-15 Thread sunfulin



hi, leonard
感谢回复。我在es的ddl with参数里加了这个,貌似还是报错。我再简单描述下我的场景:
我的es sink的ddl如下:
create table es_sink (
  a varchar,
  b varchar,
  c TIMESTAMP(9) WITH LOCAL TIME ZONE
) with (
  
)


我使用处理时间属性,将流里的proctime转成UTC格式的日期类型,作为c这个字段写入。现在能原生支持么?之前在1.10版本貌似是可以直接写的。但是到1.11写的不带时区了,导致不能兼容之前的格式。














在 2020-07-16 09:40:06,"Leonard Xu"  写道:
>Hello
>
>json解析UTC时间是支持的,你with参数里指定下json中timestamp的类型试下, json.timestamp-format.standard 
>= 'ISO-8601'
>
>Best
>Leonard Xu
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> 
>
>
>> 在 2020年7月15日,23:19,sunfulin  写道:
>> 
>> hi,
>> 我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL 
>> TIME ZONE。
>> 在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
>> 2020-07-15T12:00:00.000Z 
>> 
>> 
>> 
>> java.lang.UnsupportedOperationException: Not support to parse type: 
>> TIMESTAMP(9) WITH LOCAL TIME ZONE
>> 
>> at 
>> org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-15 21:24:30,"sunfulin"  写道:
>>> hi,
>>> 我看1.11的java.sql.Timestamp 
>>> 对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?
>


Re: flink 1.11 sql类型问题

2020-07-15 Thread Leonard Xu
Hello

json解析UTC时间是支持的,你with参数里指定下json中timestamp的类型试下, json.timestamp-format.standard 
= 'ISO-8601'

Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
 


> 在 2020年7月15日,23:19,sunfulin  写道:
> 
> hi,
> 我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL TIME 
> ZONE。
> 在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
> 2020-07-15T12:00:00.000Z 
> 
> 
> 
> java.lang.UnsupportedOperationException: Not support to parse type: 
> TIMESTAMP(9) WITH LOCAL TIME ZONE
> 
> at 
> org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-15 21:24:30,"sunfulin"  写道:
>> hi,
>> 我看1.11的java.sql.Timestamp 
>> 对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?



Re: Print table content in Flink 1.11

2020-07-15 Thread Leonard Xu
Hi, Flavio


> 在 2020年7月16日,00:19,Flavio Pompermaier  写道:
> 
> final JobExecutionResult jobRes = tableEnv.execute("test-job");
> 

In Flink 1.11, once a Table has transformed to DataStream, only 
StreamExecutionEnvironment can execute the DataStream program, please use 
env.execute(“test-job”) in this case, you can get mote information from [1].


Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
 




Using md5 hash while sinking files to s3

2020-07-15 Thread nikita Balakrishnan
Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
S3 Extended Request ID: ), S3 Extended Request ID: xx
:InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
Code: InvalidRequest; Request ID: ; S3 Extended Request ID: )
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
at
org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
at

Re: Flink Pojo Serialization for Map Values

2020-07-15 Thread Theo Diefenthal


Hi Krzysztof,

Your problems arise due to Java type erasure. If you have DataPoint with 
Map, all Flinks type system will see is a Map, i.e. 
Map. 

So in the first case, with DataPoint having an explicit member of type 
"BadPojo", Flink will deduce "DataPoint" to be a PojoType with two fields, 
whereas the second field "badPojo" itself is of type GenericType and 
thus, the second field will be serialized via kryo. 

In the second case, DataPoint will still be a PojoType and your "badPojo" field 
will also be a GenericType, but this time of type "GenericType". 
So no complaints about BadPOJO here because the entire map will be serialized 
via kryo already and flink doesn't deduce any further and doesn't see the 
BadPojo here. => No win for you :)  

In the second case, you need to explicitly tell flink that your "badPojo" field 
is a map and should be detected as "MapType" from flink. If Flink detects it as 
MapType, it will again complain about BadPojo itself and you are back to the 
roots and still need to fix the BadPojo to finally avoid kryo. :)  

I wrote myself a small utility function once when I had to tell flink about a 
POJO from an external library that contained a Map in order to serialize it 
efficiently:



/**
 * Flink has a Types.POJO function where next to a class, one can specify a 
Map with fieldnames and types to be set.
 * This function is kind of a utility. Where Flink Types.POJO function 
creates the POJO type for the class only
 * with the fields specified, this function here creates the POJOType 
normally, but replaces the type of the
 * provided field with the provided type and keeps all other fields as they 
are generated normally.
 * @throws org.apache.flink.api.common.functions.InvalidTypesException
 */
public static  TypeInformation 
pojoTypeWithElementReplacement(Class pojo, String replacementFieldName, 
TypeInformation replacementFieldType) {
final PojoTypeInfo pojoType = (PojoTypeInfo) Types.POJO(pojo);

final Map> pojoFieldTypes = IntStream
.range(0, pojoType.getArity())
.mapToObj(fieldNr -> pojoType.getPojoFieldAt(fieldNr))
.collect(Collectors.toMap(
pojoField -> pojoField.getField().getName(),
pojoField -> pojoField.getTypeInformation()
));

final TypeInformation oldTypeForElements = 
pojoFieldTypes.remove(replacementFieldName);
if (oldTypeForElements == null) {
throw new 
org.apache.flink.api.common.functions.InvalidTypesException("Expected " + 
replacementFieldName + " field to exist in order to replace it properly with 
custom type infos");
}
pojoFieldTypes.put(replacementFieldName, replacementFieldType);

return Types.POJO(pojo, pojoFieldTypes);
}


In your case, you could call it like this:

TypeInformation pojoMapType = 
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", 
Types.MAP(Types.STRING, TypeExtractor.createTypeInfo(BadPojo.class)));

a bit less verbose if BadPojo would really be a PojoType:

TypeInformation pojoMapType = 
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", 
Types.MAP(Types.STRING, Types.POJO(BadPojo.class)));

If the POJO is e.g. returned from a mapFunction, you can write something like 
stream.map(myMapFunctionReturningDataPoint).returns(pojoMapType);



Note that I wrote this for Flink 1.9. I read somewhere that Flink now has a new 
type system somehow, somewhere but I didn't check this out yet and have no idea 
what changed. 

Best regards
Theo




- Ursprüngliche Mail -
Von: "KristoffSC" 
An: "user" 
Gesendet: Montag, 13. Juli 2020 23:06:20
Betreff: Flink Pojo Serialization for Map Values

Hi,
I would like to ask Flink Pojo Serialziation described in [1]

I have a case where my custom event source produces Events described by
Pojo:

public class DataPoint
{
public long timestamp;
public double value;
public BadPojo badPojo = new BadPojo();

public DataPoint() {}

}

Where BadPojo class is something like this:
public class BadPojo {

private final String fieldA = "X";
}

So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA

So this is an expected result.

However when I change DataPoint class to use:
public Map badPojo = new HashMap<>();

instead direct BadPojo field no longer see logs complaining about BadPojo
class.

In this case DataPoint class looks like this:
public class DataPoint
{
public long timestamp;
public double value;
public Map badPojo = new HashMap<>();

public DataPoint() {}

}

My questions:
1. What actually happen here?
2. Which setrializator is used by Flink?
3. How Maps 

Re: flink app crashed

2020-07-15 Thread Rainie Li
These are the console log after launch the app:

2020-07-15 19:25:28,507 INFO
 org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN
application has been deployed successfully.
Starting execution of program
---Environment Variables-
DOCKER_CONFIG=/etc/.docker
FLINK_BIN_DIR=/usr/local/flink-1.9.1/bin
FLINK_CONF_DIR=/etc/flink-1.9.1/conf/
FLINK_LIB_DIR=/usr/local/flink-1.9.1/lib
FLINK_LOG_DIR=/home/karthik/pincohesion
FLINK_OPT_DIR=/usr/local/flink-1.9.1/opt
FLINK_PLUGINS_DIR=/usr/local/flink-1.9.1/plugins
HADOOP_CLASSPATH=/usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/*:/usr/local/hadoop/share/hadoop/common/*:/usr/local/hadoop/share/hadoop/hdfs:/usr/local/hadoop/share/hadoop/hdfs/lib/*:/usr/local/hadoop/share/hadoop/hdfs/*:/usr/local/hadoop/share/hadoop/yarn/lib/*:/usr/local/hadoop/share/hadoop/yarn/*:/usr/local/hadoop/share/hadoop/mapreduce/lib/*:/usr/local/hadoop/share/hadoop/mapreduce/*:/usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/*:/usr/local/hadoop/share/hadoop/common/*:/usr/local/hadoop/share/hadoop/hdfs:/usr/local/hadoop/share/hadoop/hdfs/lib/*:/usr/local/hadoop/share/hadoop/hdfs/*:/usr/local/hadoop/share/hadoop/yarn/lib/*:/usr/local/hadoop/share/hadoop/yarn/*:/usr/local/hadoop/share/hadoop/mapreduce/lib/*:/usr/local/hadoop/share/hadoop/mapreduce/*:/usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/*:/usr/local/hadoop/share/hadoop/common/*:/usr/local/hadoop/share/hadoop/hdfs:/usr/local/hadoop/share/hadoop/hdfs/lib/*:/usr/local/hadoop/share/hadoop/hdfs/*:/usr/local/hadoop/share/hadoop/yarn/lib/*:/usr/local/hadoop/share/hadoop/yarn/*:/usr/local/hadoop/share/hadoop/mapreduce/lib/*:/usr/local/hadoop/share/hadoop/mapreduce/*:/usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/*:/usr/local/hadoop/share/hadoop/common/*:/usr/local/hadoop/share/hadoop/hdfs:/usr/local/hadoop/share/hadoop/hdfs/lib/*:/usr/local/hadoop/share/hadoop/hdfs/*:/usr/local/hadoop/share/hadoop/yarn/lib/*:/usr/local/hadoop/share/hadoop/yarn/*:/usr/local/hadoop/share/hadoop/mapreduce/lib/*:/usr/local/hadoop/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar:/usr/local/hadoop/share/hadoop/tools/lib/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar:/usr/local/hadoop/share/hadoop/tools/lib/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar:/usr/local/hadoop/share/hadoop/tools/lib/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar:/usr/local/hadoop/share/hadoop/tools/lib/*
HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
HADOOP_HOME=/usr/local/hadoop
HISTFILE=/home/rainieli/.bash_history
HISTFILESIZE=2000
HISTIGNORE=
HISTSIZE=1000
HOME=/home/rainieli
JAVA_HOME=/usr/lib/jvm/java-8-oracle
LANG=C.UTF-8
LC_TERMINAL=iTerm2
LC_TERMINAL_VERSION=3.3.9
LESSCLOSE=/usr/bin/lesspipe %s %s
LESSOPEN=| /usr/bin/lesspipe %s
LOGNAME=rainieli
LS_COLORS=rs=0:di=01;34:ln=01;36:mh=00:pi=40;33:so=01;35:do=01;35:bd=40;33;01:cd=40;33;01:or=40;31;01:mi=00:su=37;41:sg=30;43:ca=30;41:tw=30;42:ow=34;42:st=37;44:ex=01;32:*.tar=01;31:*.tgz=01;31:*.arc=01;31:*.arj=01;31:*.taz=01;31:*.lha=01;31:*.lz4=01;31:*.lzh=01;31:*.lzma=01;31:*.tlz=01;31:*.txz=01;31:*.tzo=01;31:*.t7z=01;31:*.zip=01;31:*.z=01;31:*.Z=01;31:*.dz=01;31:*.gz=01;31:*.lrz=01;31:*.lz=01;31:*.lzo=01;31:*.xz=01;31:*.zst=01;31:*.tzst=01;31:*.bz2=01;31:*.bz=01;31:*.tbz=01;31:*.tbz2=01;31:*.tz=01;31:*.deb=01;31:*.rpm=01;31:*.jar=01;31:*.war=01;31:*.ear=01;31:*.sar=01;31:*.rar=01;31:*.alz=01;31:*.ace=01;31:*.zoo=01;31:*.cpio=01;31:*.7z=01;31:*.rz=01;31:*.cab=01;31:*.wim=01;31:*.swm=01;31:*.dwm=01;31:*.esd=01;31:*.jpg=01;35:*.jpeg=01;35:*.mjpg=01;35:*.mjpeg=01;35:*.gif=01;35:*.bmp=01;35:*.pbm=01;35:*.pgm=01;35:*.ppm=01;35:*.tga=01;35:*.xbm=01;35:*.xpm=01;35:*.tif=01;35:*.tiff=01;35:*.png=01;35:*.svg=01;35:*.svgz=01;35:*.mng=01;35:*.pcx=01;35:*.mov=01;35:*.mpg=01;35:*.mpeg=01;35:*.m2v=01;35:*.mkv=01;35:*.webm=01;35:*.ogm=01;35:*.mp4=01;35:*.m4v=01;35:*.mp4v=01;35:*.vob=01;35:*.qt=01;35:*.nuv=01;35:*.wmv=01;35:*.asf=01;35:*.rm=01;35:*.rmvb=01;35:*.flc=01;35:*.avi=01;35:*.fli=01;35:*.flv=01;35:*.gl=01;35:*.dl=01;35:*.xcf=01;35:*.xwd=01;35:*.yuv=01;35:*.cgm=01;35:*.emf=01;35:*.ogv=01;35:*.ogx=01;35:*.aac=00;36:*.au=00;36:*.flac=00;36:*.m4a=00;36:*.mid=00;36:*.midi=00;36:*.mka=00;36:*.mp3=00;36:*.mpc=00;36:*.ogg=00;36:*.ra=00;36:*.wav=00;36:*.oga=00;36:*.opus=00;36:*.spx=00;36:*.xspf=00;36:
MAIL=/var/mail/rainieli
OLDPWD=/home/rainieli
PATH=/usr/lib/jvm/java-8-oracle/bin:/usr/lib/jvm/java-8-oracle/bin:/usr/lib/jvm/java-8-oracle/bin:/usr/lib/jvm/java-8-oracle/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/hadoop/bin:/usr/local/hadoop/bin
PWD=/home/karthik
SHELL=/bin/bash
SHLVL=1
SSH_CLIENT=172.16.11.92 64705 22
SSH_CONNECTION=172.16.11.92 64705 10.2.66.110 22
SSH_TTY=/dev/pts/2
S_COLORS=auto
TERM=xterm-256color
USER=rainieli
---Command Line Arguments-
[--conf-file, PIN_JOIN_pin_cohesion_realtime_signal.prod.properties]
Current working directory: 

Re: Flink Pojo Serialization for Map Values

2020-07-15 Thread KristoffSC
Hi,
Any ideas about that one?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink app crashed

2020-07-15 Thread Rainie Li
Thank you, Jesse.

Here are more log info:

2020-07-15 18:19:36,456 INFO  org.apache.flink.client.cli.CliFrontend
-

2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.size, 1024m
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2020-07-15 18:19:36,461 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-07-15 18:19:36,463 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
2020-07-15 18:19:36,519 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not availab\
le.
2020-07-15 18:19:36,647 INFO
 org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
create Hadoop Security Module because Hadoop cannot be found in the
Classpath.
2020-07-15 18:19:36,658 INFO
 org.apache.flink.runtime.security.SecurityUtils   - Cannot
install HadoopSecurityContext because Hadoop cannot be found in the
Classpath.


Best regards
Rainie

On Wed, Jul 15, 2020 at 11:49 AM Jesse Lord  wrote:

> Hi Rainie,
>
>
>
> I am relatively new to flink, but I suspect that your error is somewhere
> else in the log. I have found most of my problems by doing a search for the
> word “error” or “exception”. Since all of these log lines are at the info
> level, they are probably not highlighting any real issues. If you send more
> of the log or find an error line that might help others debug.
>
>
>
> Thanks,
>
> Jesse
>
>
>
> *From: *Rainie Li 
> *Date: *Wednesday, July 15, 2020 at 10:54 AM
> *To: *"user@flink.apache.org" 
> *Subject: *flink app crashed
>
>
>
> Hi All,
>
>
>
> I am new to Flink, any idea why flink app's Job Manager stuck, here is
> bottom part from the Job Manager log. Any suggestion will be appreciated.
>
> 2020-07-15 16:49:52,749 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint
> for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/dispatcher .
>
> 2020-07-15 16:49:52,759 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>
> 2020-07-15 16:49:52,759 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>
> 2020-07-15 16:49:52,762 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
>
> 2020-07-15 16:49:52,790 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher
> /user/dispatcher was granted leadership with fencing token
>
> 2020-07-15 16:49:52,791 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all
> persisted jobs.
>
> 2020-07-15 16:49:52,931 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing
> over to rm1
>
> 2020-07-15 16:49:53,014 INFO org.apache.flink.yarn.YarnResourceManager -
> Recovered 0 containers from 

Re: flink app crashed

2020-07-15 Thread Jesse Lord
Hi Rainie,

I am relatively new to flink, but I suspect that your error is somewhere else 
in the log. I have found most of my problems by doing a search for the word 
“error” or “exception”. Since all of these log lines are at the info level, 
they are probably not highlighting any real issues. If you send more of the log 
or find an error line that might help others debug.

Thanks,
Jesse

From: Rainie Li 
Date: Wednesday, July 15, 2020 at 10:54 AM
To: "user@flink.apache.org" 
Subject: flink app crashed

Hi All,

I am new to Flink, any idea why flink app's Job Manager stuck, here is bottom 
part from the Job Manager log. Any suggestion will be appreciated.
2020-07-15 16:49:52,749 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - 
Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/dispatcher .
2020-07-15 16:49:52,759 INFO 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2020-07-15 16:49:52,759 INFO 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - 
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2020-07-15 16:49:52,762 INFO 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
2020-07-15 16:49:52,790 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher 
/user/dispatcher was granted leadership with fencing token
2020-07-15 16:49:52,791 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all 
persisted jobs.
2020-07-15 16:49:52,931 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over 
to rm1
2020-07-15 16:49:53,014 INFO org.apache.flink.yarn.YarnResourceManager - 
Recovered 0 containers from previous attempts ([]).
2020-07-15 16:49:53,018 INFO 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl - Upper bound of 
the thread pool size is 500
2020-07-15 16:49:53,020 INFO 
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - 
yarn.client.max-cached-nodemanagers-proxies : 0
2020-07-15 16:49:53,021 INFO 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
2020-07-15 16:49:53,042 INFO org.apache.flink.yarn.YarnResourceManager - 
ResourceManager akka.tcp://flink@cluster-dev-001/user/resourcemanager was 
granted leadership with fencing token
2020-07-15 16:49:53,046 INFO 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Starting 
the SlotManager.
2020-07-15 16:50:52,217 INFO org.apache.kafka.clients.Metadata - Cluster ID: 
FZrfSqHiTpaZwEzIRYkCLQ


Thanks
Best regards
Rainie


How to write junit testcases for KeyedBroadCastProcess Function

2020-07-15 Thread bujjirahul45
Hi,

I am new to flink i am trying write junit test cases to test
KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the
getDataStreamOutput method in TestUtils class and passing inputdata and
patternrules to method once the input data is evaluated against list of
pattern rules and if input data satisfy the condition i will get the signal
and calling sink function and returning output data as string in
getDataStreamOutput method

 @Test
public void testCompareInputAndOutputDataForInputSignal() throws
Exception {
Assertions.assertEquals(sampleInputSignal,
TestUtils.getDataStreamOutput(
inputSignal,
patternRules));
}



public static String getDataStreamOutput(JSONObject input, Map patternRules) throws Exception {

env.setParallelism(1);

DataStream inputSignal = env.fromElements(input);

DataStream> rawPatternStream =
env.fromElements(patternRules);

//Generate a key,value pair of set of patterns where key is
pattern name and value is pattern condition
DataStream>>
patternRuleStream =
rawPatternStream.flatMap(new
FlatMapFunction,
Tuple2>>() {
@Override
public void flatMap(Map
patternRules,
Collector>> out) throws Exception {
for (Map.Entry stringEntry :
patternRules.entrySet()) {
JSONObject jsonObject = new
JSONObject(stringEntry.getValue());
Map map = new HashMap<>();
for (String key : jsonObject.keySet()) {
String value =
jsonObject.get(key).toString();
map.put(key, value);
}
out.collect(new
Tuple2<>(stringEntry.getKey(), map));
}
}
});

BroadcastStream>>
patternRuleBroadcast =
patternStream.broadcast(patternRuleDescriptor);


DataStream> validSignal =
inputSignal.map(new MapFunction>() {
@Override
public Tuple2 map(JSONObject
inputSignal) throws Exception {
String source =
inputSignal.getSource();
return new Tuple2<>(source, inputSignal);
}
}).keyBy(0).connect(patternRuleBroadcast).process(new
MyKeyedBroadCastProcessFunction());


 validSignal.map(new MapFunction,
JSONObject>() {
@Override
public JSONObject map(Tuple2
inputSignal) throws Exception {
return inputSignal.f1;
}
}).addSink(new getDataStreamOutput());

env.execute("TestFlink");
}
return (getDataStreamOutput.dataStreamOutput);
}


@SuppressWarnings("serial")
public static final class getDataStreamOutput implements
SinkFunction {
public static String dataStreamOutput;

public void invoke(JSONObject inputSignal) throws Exception {
dataStreamOutput = inputSignal.toString();
}
}
I need to test different inputs with same broadcast rules but each time i
am calling this function its again and again doing process from beginning
take input signal broadcast data, is there a way i can broadcast once and
keeping on sending the input to the method i explored i can use
CoFlatMapFunction something like below to combine datastream and keep on
sending the input rules while method is running but for this one of the
datastream has to keep on getting data from kafka topic again it will
overburden on method to load kafka utils and server

 DataStream inputSignalFromKafka =
env.addSource(inputSignalKafka);

DataStream inputSignalFromMethod =
env.fromElements(inputSignal));

DataStream inputSignal =
inputSignalFromMethod.connect(inputSignalFromKafka)
.flatMap(new SignalCoFlatMapper());


   public static class SignalCoFlatMapper
implements CoFlatMapFunction {

@Override
public void flatMap1(JSONObject inputValue, Collector
out) throws Exception {
out.collect(inputValue);

}

@Override
public void flatMap2(JSONObject kafkaValue, Collector
out) throws Exception {
out.collect(kafkaValue);

}
}
I found a link in stackoverflow How to unit test BroadcastProcessFunction
in flink when processElement depends on broadcasted data but this is
confused me a lot

Any way i can only broadcast only once in Before method in test cases and
keeping sending different kind of data to my broadcast function


Thanks,
Rahul.


Performance test Flink vs Storm

2020-07-15 Thread Prasanna kumar
Hi,

We are testing flink and storm for our streaming pipelines on various
features.

In terms of Latency,i see the flink comes up short on storm even if more
CPU is given to it. Will Explain in detail.

*Machine*. t2.large 4 core 16 gb. is used for Used for flink task manager
and storm supervisor node.
*Kafka Partitions* 4
*Messages tested:* 1million
*Load* : 50k/sec

*Scenario*:
Read from Kafka -> Transform (Map to a different JSON format) - > Write to
a Kafka topic.

*Test 1*
Storm Parallelism is set as 1. There are four processes. 1 Spout (Read from
Kafka) and 3 bolts (Transformation and sink) .
Flink. Operator level parallelism not set. Task Parallelism is set as 1.
Task slot is 1 per core.

Storm was 130 milliseconds faster in 1st record.
Storm was 20 seconds faster in 1 millionth record.

*Test 2*
Storm Parallelism is set as 1. There are four processes. 1 Spout (Read from
Kafka) and 3 bolts (Transformation and sink)
Flink. Operator level parallelism not set. Task Parallelism is set as 4.
Task slot is 1 per core. So all cores is used.

Storm was 180 milliseconds faster in 1st record.
Storm was 25 seconds faster in 1 millionth record.

*Observations here*
1) Increasing Parallelism did not increase the performance in Flink rather
it became 50ms to 5s slower.
2) Flink is slower in Reading from Kafka compared to storm. Thats where the
bulk of the latency is.  for the millionth record its 19-24 seconds slower.
3) Once message is read, flink takes lesser time to transform and write to
kafka compared to storm.

*Other Flink Config*
jobmanager.heap.size: 1024m

taskmanager.memory.process.size: 1568m

*How do we improve the latency ? *
*Why does latency becomes worse when parallelism is increased and matched
to partitions?*

Thanks,
Prasanna.


flink app crashed

2020-07-15 Thread Rainie Li
Hi All,

I am new to Flink, any idea why flink app's Job Manager stuck, here is
bottom part from the Job Manager log. Any suggestion will be appreciated.
2020-07-15 16:49:52,749 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint
for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/dispatcher .
2020-07-15 16:49:52,759 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2020-07-15 16:49:52,759 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2020-07-15 16:49:52,762 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
Starting ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
2020-07-15 16:49:52,790 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher
/user/dispatcher was granted leadership with fencing token
2020-07-15 16:49:52,791 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all
persisted jobs.
2020-07-15 16:49:52,931 INFO
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing
over to rm1
2020-07-15 16:49:53,014 INFO org.apache.flink.yarn.YarnResourceManager -
Recovered 0 containers from previous attempts ([]).
2020-07-15 16:49:53,018 INFO
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl - Upper
bound of the thread pool size is 500
2020-07-15 16:49:53,020 INFO
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy -
yarn.client.max-cached-nodemanagers-proxies : 0
2020-07-15 16:49:53,021 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
Starting ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
2020-07-15 16:49:53,042 INFO org.apache.flink.yarn.YarnResourceManager -
ResourceManager akka.tcp://flink@cluster-dev-001/user/resourcemanager was
granted leadership with fencing token
2020-07-15 16:49:53,046 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl -
Starting the SlotManager.
2020-07-15 16:50:52,217 INFO org.apache.kafka.clients.Metadata - Cluster
ID: FZrfSqHiTpaZwEzIRYkCLQ


Thanks
Best regards
Rainie


Print table content in Flink 1.11

2020-07-15 Thread Flavio Pompermaier
Hi to all,
I'm trying to read and print out the content of my parquet directory with
Flink 1.11 (using the bridge API). However Flink complains that there is no
topology to execute..what am I doing wrong? The exception is:

java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at it.okkam.datalinks.batch.flink.ProfileTest.main(ProfileTest.java:52)

This is the code: 

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance().inStreamingMode().build());
tableEnv.executeSql(-see below  [1] );
Table inputTable = tableEnv.sqlQuery("SELECT * FROM source");
tableEnv.toAppendStream(inputTable,
   new RowTypeInfo(inputTable.getSchema().getFieldTypes());).print()
final JobExecutionResult jobRes = tableEnv.execute("test-job");

[1] --
CREATE TABLE `source` (
`col1` BIGINT,
`col2` STRING
) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'update-mode' = 'append',
'path' = '/tmp/parquet-test',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file',
'format.parquet.compression'='snappy',
'format.parquet.enable.dictionary'='true',
'format.parquet.block.size'='0',
'sink.shuffle-by-partition.enable' = 'true'
)
---

Thanks in advance,
Flavio


Re: Is there a way to access the number of 'keys' that have been used for partioning?

2020-07-15 Thread Chesnay Schepler
This information is not readily available; in fact Flink itself doesn't 
know how many keys there are at any point.

You'd have to calculate it yourself.

On 15/07/2020 17:11, orionemail wrote:

Hi,

I need to query the number of keys that a stream has been split by, is 
there a way to do this?


Thanks,

O







Status of a job when a kafka source dies

2020-07-15 Thread Nick Bendtner
Hi guys,
I want to know what is the default behavior of Kafka source when a kafka
cluster goes down during streaming. Will the job status go to failing or is
the exception caught and there is a back off before the source tries to
poll for more events ?


Best,
Nick.


Pyflink sink rowtime field

2020-07-15 Thread Jesse Lord
I am trying to sink the rowtime field in pyflink 1.10. I get the following error

For the source schema I use

.field("rowtime", DataTypes.TIMESTAMP(2))
.rowtime(
Rowtime()
.timestamps_from_field("timestamp")
.watermarks_periodic_ascending()
)

To create the rowtime field and have tried variations on

.field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())

In the sink schema.

Trying all of the different types in DataTypes I get essentially the following 
error:

py4j.protocol.Py4JJavaError: An error occurred while calling o56.insertInto.
: org.apache.flink.table.api.ValidationException: Field types of query result 
and registered TableSink `default_catalog`.`default_database`.`output` do not 
match.
Query result schema: [rowtime: LocalDateTime]
TableSink schema:[rowtime: Timestamp]


I know that in Java there is 
org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python 
documentation lists Types.SQL_TIMESTAMP, but I cannot find the corresponding 
type in the python library. Can anyone help point me to the correct type for 
the schema?

Thanks,
Jesse







Re:flink 1.11 sql类型问题

2020-07-15 Thread sunfulin
hi,
我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL TIME 
ZONE。
在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
2020-07-15T12:00:00.000Z 



java.lang.UnsupportedOperationException: Not support to parse type: 
TIMESTAMP(9) WITH LOCAL TIME ZONE

at 
org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)











在 2020-07-15 21:24:30,"sunfulin"  写道:
>hi,
>我看1.11的java.sql.Timestamp 
>对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?


Is there a way to access the number of 'keys' that have been used for partioning?

2020-07-15 Thread orionemail
Hi,

I need to query the number of keys that a stream has been split by, is there a 
way to do this?

Thanks,

O

Re: map JSON to scala case class & off-heap optimization

2020-07-15 Thread Aljoscha Krettek

On 11.07.20 10:31, Georg Heiler wrote:

1) similarly to spark the Table API works on some optimized binary
representation
2) this is only available in the SQL way of interaction - there is no
programmatic API


yes it's available from SQL, but also the Table API, which is a 
programmatic declarative API, similar to Spark's Structured Streaming.




q1) I have read somewhere (I think in some Flink Forward presentations)
that the SQL API is not necessarily stable with regards to state - even
with small changes to the DAG (due to optimization). So does this also
/still apply to the table API? (I assume yes)


Yes, unfortunately this is correct. Because the Table API/SQL is 
declarative users don't have control over the DAG and the state that the 
operators have. Some work will happen on at least making sure that the 
optimizer stays stable between Flink versions or that we can let users 
pin a certain physical graph of a query so that it can be re-used across 
versions.



q2) When I use the DataSet/Stream (classical scala/java) API it looks like
I must create a custom serializer if I want to handle one/all of:

   - side-output failing records and not simply crash the job
   - as asked before automatic serialization to a scala (case) class


This is true, yes.


But I also read that creating the ObjectMapper (i.e. in Jackson terms)
inside the map function is not recommended. From Spark I know that there is
a map-partitions function, i.e. something where a database connection can
be created and then reused for the individua elements. Is a similar
construct available in Flink as well?


Yes, for this you can use "rich functions", which have an open()/close() 
method that allows initializing and re-using resources across 
invocations: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions



Also, I have read a lot of articles and it looks like a lot of people
are using the String serializer and then manually parse the JSON which also
seems inefficient.
Where would I find an example for some Serializer with side outputs for
failed records as well as efficient initialization using some similar
construct to map-partitions?


I'm not aware of such examples, unfortunately.

I hope that at least some answers will be helpful!

Best,
Aljoscha


Re:Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-15 Thread chenxyz



Hello,
Peihui,可以参考下是不是和这个问题类似?之前我在1.10.0也遇到过。
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html#a2239
解决方式:
1. 使用hdfs作为状态后端不会报错
2. 升级至1.10.1使用rocksdb也不会出现该问题














在 2020-07-14 14:41:53,"Peihui He"  写道:
>Hi Yun,
>
>我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
>print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
>里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
>yarn。
>
>Best wishes.
>
>Yun Tang  于2020年7月14日周二 上午11:57写道:
>
>> Hi Peihui
>>
>> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
>> cause。
>>
>> [1]
>> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>>
>>
>> 祝好
>> 唐云
>> 
>> From: Peihui He 
>> Sent: Tuesday, July 14, 2020 10:42
>> To: user-zh@flink.apache.org 
>> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>
>> hello,
>>
>> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>>
>>
>> Caused by: java.nio.file.NoSuchFileException:
>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> ->
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>
>> 配置和1.9.2 一样:
>> state.backend: rocksdb
>> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>> state.savepoints.dir: hdfs:///flink/savepoints/wc/
>> state.backend.incremental: true
>>
>> 代码上都有
>>
>> env.enableCheckpointing(1);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
>> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>>
>>
>>   是1.10.0 需要做什么特别配置么?
>>


flink 1.11 sql类型问题

2020-07-15 Thread sunfulin
hi,
我看1.11的java.sql.Timestamp 
对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?

Re: Parquet format in Flink 1.11

2020-07-15 Thread Flavio Pompermaier
Ok, thanks Godfrey.

On Wed, Jul 15, 2020 at 3:03 PM godfrey he  wrote:

> hi Flavio,
>
> Parquet format supports configuration from ParquetOutputFormat
> .
>  please
> refer to [1] for details
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#format-options
>
> Best,
> Godfrey
>
>
>
> Flavio Pompermaier  于2020年7月15日周三 下午8:44写道:
>
>> Hi to all,
>> in my current code I use the legacy Hadoop Output format to write my
>> Parquet files.
>> I wanted to use the new Parquet format of Flink 1.11 but I can't find how
>> to migrate the following properties:
>>
>> ParquetOutputFormat.setBlockSize(job, parquetBlockSize);
>> ParquetOutputFormat.setEnableDictionary(job, true);
>> ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>>
>> Is there a way to set those configs?
>> And if not, is there a way to handle them without modifying the source of
>> the flink connector (i.e. extending some class)?
>>
>> Best,
>> Flavio
>>
>


Re: Parquet format in Flink 1.11

2020-07-15 Thread godfrey he
hi Flavio,

Parquet format supports configuration from ParquetOutputFormat
.
please
refer to [1] for details

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#format-options

Best,
Godfrey



Flavio Pompermaier  于2020年7月15日周三 下午8:44写道:

> Hi to all,
> in my current code I use the legacy Hadoop Output format to write my
> Parquet files.
> I wanted to use the new Parquet format of Flink 1.11 but I can't find how
> to migrate the following properties:
>
> ParquetOutputFormat.setBlockSize(job, parquetBlockSize);
> ParquetOutputFormat.setEnableDictionary(job, true);
> ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>
> Is there a way to set those configs?
> And if not, is there a way to handle them without modifying the source of
> the flink connector (i.e. extending some class)?
>
> Best,
> Flavio
>


Parquet format in Flink 1.11

2020-07-15 Thread Flavio Pompermaier
Hi to all,
in my current code I use the legacy Hadoop Output format to write my
Parquet files.
I wanted to use the new Parquet format of Flink 1.11 but I can't find how
to migrate the following properties:

ParquetOutputFormat.setBlockSize(job, parquetBlockSize);
ParquetOutputFormat.setEnableDictionary(job, true);
ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);

Is there a way to set those configs?
And if not, is there a way to handle them without modifying the source of
the flink connector (i.e. extending some class)?

Best,
Flavio


Re: pyFlink UDTF function registration

2020-07-15 Thread Xingbo Huang
Hi Manas,
You need to join with the python udtf function. You can try the following
sql:

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT * FROM (
SELECT monitorId, featureName, featureData, time_st
FROM {INPUT_TABLE}, LATERAL TABLE(split(data)) as T(featureName,
featureData)) t
"""

Best,
Xingbo

Manas Kale  于2020年7月15日周三 下午7:31写道:

> Hi,
> I am trying to use a UserDefined Table Function to split up some data as
> follows:
>
> from pyflink.table.udf import udtf
>
> @udtf(input_types=DataTypes.STRING(), result_types= [DataTypes.STRING(), 
> DataTypes.DOUBLE()])
> def split_feature_values(data_string):
> json_data = loads(data_string)
> for f_name, f_value in json_data.items():
> yield (f_name, f_value)
>
> # configure the off-heap memory of current taskmanager to enable the python 
> worker uses off-heap memory.
> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>  '80m')
>
> # Register UDTF
> t_env.register_function("split", split_feature_values)
> ddl_source = f"""
> CREATE TABLE {INPUT_TABLE} (
> `monitorId` STRING,
> `deviceId` STRING,
> `state` INT,
> `data` STRING,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """
>
> ddl_temporary_table = f"""
> CREATE TABLE {TEMPORARY_TABLE} (
> `monitorId` STRING,
> `featureName` STRING,
> `featureData` DOUBLE,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
> )
> """
>
> ddl_populate_temporary_table = f"""
> INSERT INTO {TEMPORARY_TABLE}
> SELECT monitorId, split(data), time_st
> FROM {INPUT_TABLE}
> """
>
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_temporary_table)
> t_env.execute_sql(ddl_populate_temporary_table)
>
>
> However, I get the following error :
> py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
> : org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 23 to line 3, column 33:* No match found for function
> signature split()*
>
> I believe I am using the correct call to register the UDTF as per [1]. Am
> I missing something?
>
> Thanks,
> Manas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions
>


Re: Flink 1.11 test Parquet sink

2020-07-15 Thread Flavio Pompermaier
I've just opened a ticket on JIRA:
https://issues.apache.org/jira/browse/FLINK-18608

On Wed, Jul 15, 2020 at 10:10 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> Unfortunately this is a bug.
>
> The problem is in CustomizedConvertRule#convertCast as it drops the
> requested nullability. It was fixed in master as part of FLINK-13784[1].
> Therefore the example works on master.
>
> Could you create a jira issue for 1.11 version? We could backport the
> corresponding part of FLINK-13784. As a workaround you can try using the
> values without registering it in the catalog, as the registration triggers
> the type check. (I know this is not perfect):
>
> final Table inputTable = tableEnv.fromValues(//
> DataTypes.ROW(//
> DataTypes.FIELD("col1", DataTypes.STRING()), //
> DataTypes.FIELD("col2", DataTypes.STRING())//
> ), ...);
> tableEnv.executeSql(//
> "CREATE TABLE `out` (\n" + //
> "col1 STRING,\n" + //
> "col2 STRING\n" + //
> ") WITH (\n" + //
> " 'connector' = 'filesystem',\n" + //
> // " 'format' = 'parquet',\n" + //
> " 'update-mode' = 'append',\n" + //
> " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
> " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
> ")");
>
> inputTable.executeInsert(`out`);
>
> As for the types SQL does not have LONG nor STRING types. Java's long is
> equivalent to SQL's BIGINT. STRING is only an alias for
> VARCHAR(Long.MAX_VALUE), which was added for improved usability so that you
> do not need to type the max long manually. For complete list of supported
> types see the docs[2]
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-13784
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html
>
> Best,
>
> Dawid
> On 15/07/2020 09:40, Flavio Pompermaier wrote:
>
> If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I
> change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1
> FROM ParquetDataset".
> If there is still a bug fill a proper JIRA ticket with the exact
> description of the problem..
>
> Just to conclude this thread there are 2 strange things I found:
>
> 1) Is LONG really not supported yet? If I use as output table LONG,STRING
> I get
>   Exception in thread "main" java.lang.UnsupportedOperationException:
> class org.apache.calcite.sql.SqlIdentifier: LONG
>   at org.apache.calcite.util.Util.needToImplement(Util.java:967)
>
> 2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?
>
> Best,
> Flavio
>
>
> On Wed, Jul 15, 2020 at 5:28 AM Jark Wu  wrote:
>
>> I think this might be a bug in `tableEnv.fromValues`.
>>
>> Could you try to remove the DataType parameter, and let the framework
>> derive the types?
>>
>> final Table inputTable = tableEnv.fromValues(
>> Row.of(1L, "Hello"), //
>> Row.of(2L, "Hello"), //
>> Row.of(3L, ""), //
>> Row.of(4L, "Ciao"));
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 15 Jul 2020 at 11:19, Leonard Xu  wrote:
>>
>>> Hi, Flavio
>>>
>>> I reproduced your issue, and I think it should be a bug. But I’m not
>>> sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner
>>> module shaded calcite.
>>>
>>> Maybe Danny can help explain more.
>>>
>>> CC: Danny
>>>
>>> Best
>>> Leonard Xu
>>>
>>> 在 2020年7月14日,23:06,Flavio Pompermaier  写道:
>>>
>>> If I use
>>>
>>> final Table inputTable = tableEnv.fromValues(
>>> DataTypes.ROW(
>>> DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
>>> DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>>> ), ..
>>>   tableEnv.executeSql(//
>>> "CREATE TABLE `out` (" +
>>> "col1 STRING," +
>>> "col2 STRING" +
>>> ") WITH (...)
>>>
>>> the job works as expected but this is wrong IMHO
>>> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
>>> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE
>>> should be "STRING NOT NULL" . Am I correct?
>>>
>>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier 
>>> wrote:
>>>
 Sorry, obviously  " 'format' = 'parquet'" + is without comment :D

 On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi to all,
> I'm trying to test write to parquet using the following code but I
> have an error:
>
>  final TableEnvironment tableEnv =
> DatalinksExecutionEnvironment.getBatchTableEnv();
> final Table inputTable = tableEnv.fromValues(//
> DataTypes.ROW(//
> DataTypes.FIELD("col1", DataTypes.STRING()), //
> DataTypes.FIELD("col2", DataTypes.STRING())//
> ), //
> Row.of(1L, "Hello"), //
> Row.of(2L, "Hello"), //
> Row.of(3L, ""), //
> Row.of(4L, "Ciao"));
> 

pyFlink UDTF function registration

2020-07-15 Thread Manas Kale
Hi,
I am trying to use a UserDefined Table Function to split up some data as
follows:

from pyflink.table.udf import udtf

@udtf(input_types=DataTypes.STRING(), result_types=
[DataTypes.STRING(), DataTypes.DOUBLE()])
def split_feature_values(data_string):
json_data = loads(data_string)
for f_name, f_value in json_data.items():
yield (f_name, f_value)

# configure the off-heap memory of current taskmanager to enable the
python worker uses off-heap memory.
t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

# Register UDTF
t_env.register_function("split", split_feature_values)
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
`state` INT,
`data` STRING,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_temporary_table = f"""
CREATE TABLE {TEMPORARY_TABLE} (
`monitorId` STRING,
`featureName` STRING,
`featureData` DOUBLE,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND
)
"""

ddl_populate_temporary_table = f"""
INSERT INTO {TEMPORARY_TABLE}
SELECT monitorId, split(data), time_st
FROM {INPUT_TABLE}
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_temporary_table)
t_env.execute_sql(ddl_populate_temporary_table)


However, I get the following error :
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 3, column 23 to line 3, column 33:* No match found for function
signature split()*

I believe I am using the correct call to register the UDTF as per [1]. Am I
missing something?

Thanks,
Manas

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#table-functions


Re: flink1.11 sql kafka 抽取事件时间

2020-07-15 Thread Benchao Li
我感觉可以通过计算列的方式来解决呀,你只需要在计算rowtime这个列的时候保证它不是null即可,如果是null,可以设置一个默认值之类的?

18500348...@163.com <18500348...@163.com> 于2020年7月15日周三 下午3:04写道:

> 大家好!
>
> 使用flink1.11 sql接入kafka ,format为csv
> 从eventTime字段中抽取事件时间
> rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd
> HH:mm:ss'))
> eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true',
> 那么eventTime会被设置为null,此时会报一个异常:
> Caused by: java.lang.RuntimeException: RowTime field should not be null,
> please convert it to a non-null long value.
>
> 有没有什么好的方式可以解决
>
>
> 祝好!
>
>
>
> 18500348...@163.com
>


-- 

Best,
Benchao Li


Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-15 Thread B.Zhou
Hi community,
To give some background, https://github.com/pravega/flink-connectors is a 
Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the Flink 
`MasterTriggerRestoreHook` interface to trigger the Pravega checkpoint during 
Flink checkpoints to make sure the data recovery. We experienced the failures 
in the latest Flink 1.11 upgrade with the checkpoint recovery, there are some 
timeout issues for the continuous checkpoint failure on some of the test cases.
Error stacktrace:
2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
acknowledgement message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the 
pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
not been fully acknowledged yet
 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
 at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
 ... 9 common frames omitted
After some investigation, the main problem is found. It is about the checkpoint 
recovery. When Flink CheckpointCoordinator wants to finalize a checkpoint, it 
needs to check everything is acknowledged, but for some reason, the master 
state still has our ReaderCheckpointHook remaining unack-ed, hence leading the 
checkpoint failure in the complete stage.
In the PendingCheckpoint::snapshotMasterState, there is an async call to 
acknowledge the master state for each hook. But it returned before the 
acknowledgement.
I think it might be related to the latest changes of the thread model of the 
checkpoint coordinator. Can someone help to verify?

Reproduce procedure:
Checkout this branch 
https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and run 
below test case: FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint

[1] 
https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java

Best Regards,
Brian



答复: 回复:答复: flink state

2020-07-15 Thread zhao liang
Broadcast 
state是无法满足你的要求的,估计你只能像我这样把涉及的state数据融入到数据流中,在算子中针对不同的类型数据做区分了,等于人工维持这个broadcast的流的变化。

发件人: Robert.Zhang <173603...@qq.com>
日期: 星期三, 2020年7月15日 15:22
收件人: user-zh , user-zh@flink.apache.org 

主题: 回复:答复: flink state
是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed 
state把某些满足条件的key存入这个broadcast state,并在其他算子计算的时候使用这个broadcast state,比如需要这些key来做
文档中提到的nonbroadcast side是无法修改broadcast state的,是read-only,
似乎无法直接去实现





--原始邮件--
发件人: "zhao liang"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com 于2020年7月13日周一 下午9:50写道:

 Hello,all
 目前stream中遇到一个问题,
 想使用一个全局的state 在所有的keyed stream中使用,或者global
 parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
 operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽


 Best regards


Re: Communicating with my operators

2020-07-15 Thread Chesnay Schepler

Using an S3 bucket containing the configuration is the way to go.

1) web sockets, or more generally all approaches where you connect to 
the source


The JobGraph won't help you; it doesn't contain the information on where 
tasks are deployed to at runtime. It is just an abstract representation 
of your job.


You could theoretically retrieve the actual location through the REST 
API, and maybe expose the port as a metric.


But then you still have to deal with resolving IPs, internal/external 
IPs and all that jazz.


2) CoProcessFunction

We still have to get the data in somehow; so you'd need to have some 
source in any case :)


3) ParameterTool

This is really just a parsing tool, so it won't help for this use-case.

4) State Processing API

A bit too complicated. If restarting jobs is an option, you could just 
encode the commands into the source, emit them as an event of sort, and 
the process function updates it's state on reception of these events.


On 15/07/2020 10:00, Tom Wells wrote:

Hi Everyone

I'm looking for some advice on designing my operators (which 
unsurprisingly tend to take the form of SourceFunctions, 
ProcessFunctions or SinkFunctions) to allow them to be "dynamically 
configured" while running.


By way of example, I have a SourceFunction which collects the names of 
various S3 buckets, and then a ProcessFunction which reads and 
collects their contents. The gotcha is that the list of S3 buckets is 
not fixed, and can be changed during the lifetime of the job. This 
add/remove action would be done by some human administrator, and lets 
say using a simple command line tool.


For example - here is an idea of what I want to build to "communicate" 
with my flink job:


```
# Add a bucket to the flink job to process
$ ./admin-tool add-bucket --name my-s3-bucket --region eu-west-1 
--access-key ...


# Get a list of the s3 buckets we're currently processing, and when 
last they were last accessed

$ ./admin-tool list-buckets
my-s3-bucket | eu-west-1 | 5 seconds ago

# Remove buckets
$ ./admin-tool remove-bucket --name my-s3-bucket
```

Hope that gives you an idea - of course this could apply to any number 
of different source types, and could even extend to configuration of 
sinks etc too.


So - how should my command line tool communicate with my operators?

4 alternative approaches I've thought about:

- Have a SourceFunction open a websocket and listen for bucket 
add/remove commands (written to by the command line tool). I think 
this would work, but the difficulty is in figuring out where exactly 
the SourceFunction might be deployed in the flink cluster to find the 
websocket listening port. I took a look at the ClusterClient API and 
it's possibly available by inspecting the JobGraph... I'm just not 
sure if this is an anti-pattern?


- Use a CoProcessFunction instead, and have it be joined with a 
DataStream that I can somehow write to directly from the command line 
tool (maybe using flink-client api - can i write to a DataStream 
directly??). I don't think this is possible but would feel like a good 
clean approach?


- Somehow using the ParameterTool. I don't think it supports a dynamic 
use-case though?


- Writing directly to the saved state of a ProcessFunction to add the 
remove bucket names. I'm pretty unfamiliar with this approach - but 
looks possible according to the docs on the State Processor API - 
however it seems like I would have to read the savepoint, write the 
updates, then restore from savepoint which may mean suspending and 
resuming the job while that happens. Not really an issue for me, but 
does feel like possibly the wrong approach for my simple requirement.


- Solve it just using datasources - e.g. create a centrally read s3 
bucket which holds the latest configuration and is sourced and joined 
by every operator (probably using Broadcast State). My command line 
tool would then just have to write to that S3 bucket - no need to 
communicate directly with the operators.


The last option is fairly obvious and probably my default approach - 
I'm just wondering if whether any of the alternatives above are worth 
investigating. (Especially considering my endless quest to learn 
everything about Flink - i don't mind exploring the less obvious 
pathways).


I would love some guidance or advice on what you feel is the best 
approach / idiomatic approach for this.


All the best,
Tom





springboot 2.3.1 + flink 1.11.0整合后如何从外部传入application.yml配置文件?

2020-07-15 Thread vw17
Hi,
由于项目需要目前整合了springboot和flink,
但一些项目的相关配置在application.yml,生产环境需要变更其中的一些配置。之前通常的做法的是启动时使用
-Dspring.config.location=xxx 从外部指定需要的配置文件。现在想知道如果使用flink run
启动jar能否支持这种指定方式? 
谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? pyflink1.11.0window

2020-07-15 Thread ??????????????
  ??




----
??: 
   "user-zh"



Re: pyflink1.11.0window

2020-07-15 Thread Shuiqiang Chen
下面这个例子从kafka读取json格式的数据, 然后做窗口聚合后写入es, 可以参考下代码结构, 修改相应数据字段。 这份代码你本地应该是不能运行的

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.INT()], result_type=DataTypes.STRING())
def platform_code_to_name(code):
return "mobile" if code == 0 else "pc"


def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)

source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
rt as TO_TIMESTAMP(createTime),
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
paySource INT,
WATERMARK FOR rt as rt - INTERVAL '2' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'payment_msg_2',
  'connector.properties.bootstrap.servers' = '0.0.0.0:9092',
  'connector.properties.group.id' = 'test_3',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""
t_env.sql_update(source_ddl)

es_sink_ddl = """
CREATE TABLE es_sink (
platform VARCHAR,
pay_amount DOUBLE,
rowtime TIMESTAMP(3)
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'platform_pay_amount_1',
'connector.document-type' = 'payment',
'update-mode' = 'upsert',
'connector.flush-on-checkpoint' = 'true',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.max-size' = '42mb',
'connector.bulk-flush.max-actions' = '32',
'connector.bulk-flush.interval' = '1000',
'connector.bulk-flush.backoff.delay' = '1000',
'format.type' = 'json'
)
"""

t_env.sql_update(es_sink_ddl)

t_env.register_function('platformcodetoname', platform_code_to_name)

query = """
select platformcodetoname(payPlatform) as platform, sum(payAmount)
as pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT)
as rowtime
from payment_msg
group by tumble(rt, interval '5' seconds), payPlatform
"""

count_result = t_env.sql_query(query)

t_env.create_temporary_view('windowed_values', count_result)

query2 = """
select platform, last_value(pay_amount), rowtime from
windowed_values group by platform, rowtime
"""

final_result = t_env.sql_query(query2)

final_result.execute_insert(table_path='es_sink')


if __name__ == '__main__':
log_processing()


奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 下午4:40写道:

> Shuiqiang,你好:
>  
> hi,能否请求您贡献一下完整的代码的案例,我是初学者,官网的2-from_kafka_to_kafka.py这个没有窗口,我现在想要一个在此基础上有窗口的demo,尝试编了很久也未能解决。我在给这个demo加上窗口功能后总是有各种各样的问题,十分痛苦,如能帮助,感激不尽。
>
>
> 恳请所有看到此封邮件的大佬!
>
>
> 谢谢!
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年7月15日(星期三) 中午11:25
> 收件人:"user-zh"
> 主题:Re: pyflink1.11.0window
>
>
>
> 举个sql例子
> select platformcodetoname(payPlatform) as platform, sum(payAmount) as
> pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as
> rowtime
> from payment_msg group by tumble(rt, interval '5' seconds), payPlatform
> 这个query 对每5s的tumble窗口做统计。
>
> 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月15日周三 上午11:10写道:
>
>  Shuiqiang,你好:
>  nbsp;
> nbsp;我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window?
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  acqua@gmail.comgt;;
>  发送时间:nbsp;2020年7月15日(星期三) 上午10:51
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: pyflink1.11.0window
> 
> 
> 
>  琴师你好,
>  异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
>  expects a size value literal.
>  看起来是接下tumble window定义的代码不太正确吧
> 
>  Best,
>  Shuiqiang
> 
>  奇怪的不朽琴师 <1129656...@qq.comgt; 于2020年7月15日周三 上午10:27写道:
> 
>  gt; 你好:
>  gt; amp;nbsp; amp;nbsp;
>  gt;
> 
> amp;nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
>  gt; Traceback (most recent call last):
>  gt; amp;nbsp; File "tou.py", line 71, in   gt; amp;nbsp; amp;nbsp; from_kafka_to_kafka_demo()
>  gt; amp;nbsp; File "tou.py", line 21, in
> from_kafka_to_kafka_demo
>  gt; 

Re: Flink 1.11 submit job timed out

2020-07-15 Thread SmileSmile
Hi Roc

该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/15/2020 17:16, Roc Marshal wrote:
Hi,SmileSmile.
个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
希望这对你有帮助。


祝好。
Roc Marshal











在 2020-07-15 17:04:18,"SmileSmile"  写道:
>
>Hi
>
>使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
>并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
>out,作业提交失败。web ui也会卡主无响应。
>
>用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
>
>
>部分日志如下:
>
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.32.160.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.44.224.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,461 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.40.32.9, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
>heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
>Disconnect job manager 
>0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
> for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.
>
>
>how to deal with ?
>
>
>beset !
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re:Flink 1.11 submit job timed out

2020-07-15 Thread Roc Marshal
Hi,SmileSmile.
个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
希望这对你有帮助。


祝好。
Roc Marshal











在 2020-07-15 17:04:18,"SmileSmile"  写道:
>
>Hi
>
>使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
>并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
>out,作业提交失败。web ui也会卡主无响应。
>
>用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
>
>
>部分日志如下:
>
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.32.160.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.44.224.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,461 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.40.32.9, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
>heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
>Disconnect job manager 
>0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
> for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.
>
>
>how to deal with ?
>
>
>beset !
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re:【求助】Flink Hadoop依赖问题

2020-07-15 Thread Roc Marshal



你好,Z-Z,

可以尝试在 
https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/ 
下载对应的uber jar包,并就将下载后的jar文件放到flink镜像的 ${FLINK_HOME}/lib 路径下,之后启动编排的容器。
祝好。
Roc Marshal.











在 2020-07-15 10:47:39,"Z-Z"  写道:
>我在使用Flink 1.11.0版本中,使用docker-compose搭建,docker-compose文件如下:
>version: "2.1"
>services:
> jobmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6123"
>  ports:
>   - "8081:8081"
>  command: jobmanager
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>   - 
>HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
>  volumes:
>   - ./jobmanager/conf:/opt/flink/conf
>   - ./data:/data
>
>
> taskmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6121"
>   - "6122"
>  depends_on:
>   - jobmanager
>  command: taskmanager
>  links:
>   - "jobmanager:jobmanager"
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>  volumes:
>   - ./taskmanager/conf:/opt/flink/conf
>networks:
> default:
>  external:
>   name: flink-network
>
>
>
>hadoop-2.9.2已经放在data目录了,且已经在jobmanager和taskmanager的环境变量里添加了HADOOP_CLASSPATH,但通过cli提交和webui提交,jobmanager还是提示报Could
> not find a file system implementation for scheme 'hdfs'。有谁知道是怎么回事吗?


Flink 1.11 submit job timed out

2020-07-15 Thread SmileSmile

Hi

使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
out,作业提交失败。web ui也会卡主无响应。

用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。


部分日志如下:

2020-07-15 16:58:46,460 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
could be resolved for the IP address 10.32.160.7, using IP address as host 
name. Local input split assignment (such as for HDFS files) may be impacted.
2020-07-15 16:58:46,460 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
could be resolved for the IP address 10.44.224.7, using IP address as host 
name. Local input split assignment (such as for HDFS files) may be impacted.
2020-07-15 16:58:46,461 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
could be resolved for the IP address 10.40.32.9, using IP address as host name. 
Local input split assignment (such as for HDFS files) may be impacted.

2020-07-15 16:59:10,236 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
2020-07-15 16:59:10,236 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
 for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.


how to deal with ?


beset !

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

回复: FlinkSQL 入到 MySQL后汉字乱码

2020-07-15 Thread wangl...@geekplus.com.cn

是 MySQL_tableB 所在的 server 端字符设置有问题。
配置中加上下面的配置就好了。


[mysqld] character-set-server=utf8 [client] default-character-set=utf8 [mysql] 
default-character-set=utf8




wangl...@geekplus.com.cn 

发件人: wangl...@geekplus.com.cn
发送时间: 2020-07-15 16:34
收件人: user-zh
主题: FlinkSQL 入到 MySQL后汉字乱码
 
KafkaTable:kafka 消息
MySQL_tableA:  维表,维表里 value 是汉字
MySQL_tableB:  join后的结果表。和 MySQL_tableA 不在同一台服务器上。
 
我直接在 flink sql client   SELECT 是可以正常显示, 但 INSERT INTO MySQL_tableB SELECT 后到 
MySQL_tableB 里去查看,汉字就乱码了。
大家有什么建议吗?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 


Re: ERROR submmiting a flink job

2020-07-15 Thread Yun Tang
Hi Aissa

The reason why the job exits is due to "Recovery is suppressed by 
NoRestartBackoffTimeStrategy" and this is because Flink use "no restart" 
strategy when checkpoint is not enabled [1]. That is to say, you should better 
look at why the job failed at the 1st time, once the job failed and you will 
meet the errors you pasted if the strategy is "no restart".

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html#restart-strategies

Best
Yun Tang

From: Aissa Elaffani 
Sent: Wednesday, July 15, 2020 7:29
To: user@flink.apache.org 
Subject: ERROR submmiting a flink job

Hello Guys,
I am trying to launch a FLINK app on a distance server, but I have this error 
message.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: 8bf7f299746e051ea7b94afd07e29d3d)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
8bf7f299746e051ea7b94afd07e29d3d)
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at sensors.StreamingJob.main(StreamingJob.java:145)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 8bf7f299746e051ea7b94afd07e29d3d)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 

Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-15 Thread jindy_liu
确实是这行导致的,
如果都重构了,那应该怎么用较好的?
我需要知道每一行对应的是insert, update还是delete事件。
或者问题变一下,对于这种api,一般遵守什么规则,flink的版本兼容性会更好?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? pyflink1.11.0window

2020-07-15 Thread ??????????????
Shuiqiang
  
hi2-from_kafka_to_kafka.py??demodemo??





??




----
??: 
   "user-zh"



Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-15 Thread Robin Zhang
据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑

Best
Robin Zhang

From: Peihui He <[hidden email]>
Sent: Tuesday, July 14, 2020 10:42
To: [hidden email] <[hidden email]>
Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

hello,

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示


Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
->
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

配置和1.9.2 一样:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
state.savepoints.dir: hdfs:///flink/savepoints/wc/
state.backend.incremental: true

代码上都有

env.enableCheckpointing(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));


  是1.10.0 需要做什么特别配置么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: RestartStrategy failure count when losing a Task Manager

2020-07-15 Thread Chesnay Schepler
1) A restart in one region only increments the count by 1, independent 
of how many tasks from that region fail at the same time.
If tasks from different regions fail at the same time, then the bound is 
incremented by the number of affected regions.


2)

I would consider what failure rate is acceptable if there were no 
regions, and then multiple that but the number of slots to offset task 
executor failures.



Failures in the application (e.g., a source failing for some reason) 
will generally behave, failure-rate wise, as if regions would not exist. 
They are sporadic, and the chance of them appearing in different regions 
at the same time seems rather small.



On 15/07/2020 00:16, Jiahui Jiang wrote:
Hello Flink, I have some questions regarding to the guideline on 
configuring restart strategy.


I was testing a job with the following setup:

 1. There are many tasks, but currently I'm running with only 2
parallelism, but plenty of task slots (4 TM and 4 task slot in
each TM).
 2. It's ran in k8s with HA enabled.
 3. The current restart strategy is 'failure-rate' with 30mins failure
interval, 1 min delay interval and 3 failure rate.

When a TM got removed by k8s, it looked like that caused multiple 
failure to happen all at once. In the job manager log, I'm seeing 
different task failed with the same stacktrace 'Heartbeat of 
taskManager with id {SOME_ID} timed out' around the same time.


I understand that all the tasks that were running on this taskManager 
would fail. But still have these following questions:


Questions:

 1. What count as one failure for a restartStrategy? It doesn't look
like every failed task counts towards one failure according to my
other jobs. Is it because we have failover strategy defaults to be
region, and each failure only trigger part of the job graph to
restart, and the rest of the 'not retriggered' job graph can still
cause more failure that will be counted towards failure rate?
 2. If that's the case, what will be the recommended way to set
restart strategy? If we don't want to hard code a number for every
single pipeline we are running, is that a good way to infer how to
set the failure rate?

Thank you so much!
Jiahui





Re: How to debug window states

2020-07-15 Thread Paul Lam
It turns out to be a bug of StateTTL [1]. But I’m still interested in debugging 
window states. 

Any suggestions are appreciated. Thanks!

1. https://issues.apache.org/jira/browse/FLINK-16581 


Best,
Paul Lam

> 2020年7月15日 13:13,Paul Lam  写道:
> 
> Hi,
> 
> Since currently State Processor doesn’t support window states, what’s the 
> recommended way to debug window states?
> 
> The background is that I have a SQL job whose states, mainly window 
> aggregation operator states, keep increasing steadily.
> 
> The things I’ve checked:
> - Outputs are as expected.
> - Keys amount are capped.
> - Watermarks are good, no lags.
> - No back pressure.
> 
> If I can directly read the states to find out what’s accountable for the 
> state size growth, that would be very intuitional and time-saving.
> 
> Best,
> Paul Lam
> 



Re: Flink 1.11 test Parquet sink

2020-07-15 Thread Dawid Wysakowicz
Hi,

Unfortunately this is a bug.

The problem is in CustomizedConvertRule#convertCast as it drops the
requested nullability. It was fixed in master as part of FLINK-13784[1].
Therefore the example works on master.

Could you create a jira issue for 1.11 version? We could backport the
corresponding part of FLINK-13784. As a workaround you can try using the
values without registering it in the catalog, as the registration
triggers the type check. (I know this is not perfect):

    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), ...);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    inputTable.executeInsert(`out`);

As for the types SQL does not have LONG nor STRING types. Java's long is
equivalent to SQL's BIGINT. STRING is only an alias for
VARCHAR(Long.MAX_VALUE), which was added for improved usability so that
you do not need to type the max long manually. For complete list of
supported types see the docs[2]


[1] https://issues.apache.org/jira/browse/FLINK-13784

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html

Best,

Dawid

On 15/07/2020 09:40, Flavio Pompermaier wrote:
> If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if
> I change also the query to "INSERT INTO `out` SELECT CAST(f0 AS
> STRING) ,f1 FROM ParquetDataset".
> If there is still a bug fill a proper JIRA ticket with the exact
> description of the problem..
>
> Just to conclude this thread there are 2 strange things I found:
>
> 1) Is LONG really not supported yet? If I use as output table
> LONG,STRING I get
>       Exception in thread "main"
> java.lang.UnsupportedOperationException: class
> org.apache.calcite.sql.SqlIdentifier: LONG
>       at org.apache.calcite.util.Util.needToImplement(Util.java:967)
>
> 2) The new planner translates STRING to VARCHAR(2147483647). Is it
> correct?
>
> Best,
> Flavio
>
>
> On Wed, Jul 15, 2020 at 5:28 AM Jark Wu  > wrote:
>
> I think this might be a bug in `tableEnv.fromValues`.
>
> Could you try to remove the DataType parameter, and let the
> framework derive the types?
>
> final Table inputTable = tableEnv.fromValues(
>         Row.of(1L, "Hello"), //
>         Row.of(2L, "Hello"), //
>         Row.of(3L, ""), //
>         Row.of(4L, "Ciao"));
>
> Best,
> Jark
>
>
> On Wed, 15 Jul 2020 at 11:19, Leonard Xu  > wrote:
>
> Hi, Flavio
>
> I reproduced your issue, and I think it should be a bug. But
> I’m not sure it comes from Calcite or Flink shaded Calcite,
> Flink Table Planner module shaded calcite. 
>
> Maybe Danny can help explain more.
>
> CC: Danny
>
> Best
> Leonard Xu
>
>> 在 2020年7月14日,23:06,Flavio Pompermaier > > 写道:
>>
>> If I use 
>>
>> final Table inputTable = tableEnv.fromValues(
>>         DataTypes.ROW(
>>             DataTypes.FIELD("col1",
>> DataTypes.STRING().notNull()),
>>             DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>>         ), ..
>>   tableEnv.executeSql(//
>>         "CREATE TABLE `out` (" +
>>             "col1 STRING," + 
>>             "col2 STRING" + 
>>             ") WITH (...)
>>
>> the job works as expected but this is wrong IMHO
>> because DataTypes.STRING() = DataTypes.STRING().nullable() by
>> default.
>> If I have DataTypes.STRING().notNull() the type in the CREATE
>> TABLE should be "STRING NOT NULL" . Am I correct?
>>
>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier
>> mailto:pomperma...@okkam.it>> wrote:
>>
>> Sorry, obviously  " 'format' = 'parquet'" + is
>> without comment :D
>>
>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier
>> mailto:pomperma...@okkam.it>> wrote:
>>
>> Hi to all,
>> I'm trying to test write to parquet using the
>> following code but I have an error:
>>
>>  final TableEnvironment tableEnv =
>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>     final Table inputTable = tableEnv.fromValues(//
>>         DataTypes.ROW(//
>>             DataTypes.FIELD("col1",
>> 

Communicating with my operators

2020-07-15 Thread Tom Wells
Hi Everyone

I'm looking for some advice on designing my operators (which unsurprisingly
tend to take the form of SourceFunctions, ProcessFunctions or
SinkFunctions) to allow them to be "dynamically configured" while running.

By way of example, I have a SourceFunction which collects the names of
various S3 buckets, and then a ProcessFunction which reads and collects
their contents. The gotcha is that the list of S3 buckets is not fixed, and
can be changed during the lifetime of the job. This add/remove action would
be done by some human administrator, and lets say using a simple command
line tool.

For example - here is an idea of what I want to build to "communicate" with
my flink job:

```
# Add a bucket to the flink job to process
$ ./admin-tool add-bucket --name my-s3-bucket --region eu-west-1
--access-key ...

# Get a list of the s3 buckets we're currently processing, and when last
they were last accessed
$ ./admin-tool list-buckets
my-s3-bucket | eu-west-1 | 5 seconds ago

# Remove buckets
$ ./admin-tool remove-bucket --name my-s3-bucket
```

Hope that gives you an idea - of course this could apply to any number of
different source types, and could even extend to configuration of sinks etc
too.

So - how should my command line tool communicate with my operators?

4 alternative approaches I've thought about:

- Have a SourceFunction open a websocket and listen for bucket add/remove
commands (written to by the command line tool). I think this would work,
but the difficulty is in figuring out where exactly the SourceFunction
might be deployed in the flink cluster to find the websocket listening
port. I took a look at the ClusterClient API and it's possibly available by
inspecting the JobGraph... I'm just not sure if this is an anti-pattern?

- Use a CoProcessFunction instead, and have it be joined with a DataStream
that I can somehow write to directly from the command line tool (maybe
using flink-client api - can i write to a DataStream directly??). I don't
think this is possible but would feel like a good clean approach?

- Somehow using the ParameterTool. I don't think it supports a dynamic
use-case though?

- Writing directly to the saved state of a ProcessFunction to add the
remove bucket names. I'm pretty unfamiliar with this approach - but looks
possible according to the docs on the State Processor API - however it
seems like I would have to read the savepoint, write the updates, then
restore from savepoint which may mean suspending and resuming the job while
that happens. Not really an issue for me, but does feel like possibly the
wrong approach for my simple requirement.

- Solve it just using datasources - e.g. create a centrally read s3 bucket
which holds the latest configuration and is sourced and joined by every
operator (probably using Broadcast State). My command line tool would then
just have to write to that S3 bucket - no need to communicate directly with
the operators.

The last option is fairly obvious and probably my default approach - I'm
just wondering if whether any of the alternatives above are worth
investigating. (Especially considering my endless quest to learn everything
about Flink - i don't mind exploring the less obvious pathways).

I would love some guidance or advice on what you feel is the best approach
/ idiomatic approach for this.

All the best,
Tom


Re: Flink 1.11 test Parquet sink

2020-07-15 Thread Flavio Pompermaier
If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I
change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1
FROM ParquetDataset".
If there is still a bug fill a proper JIRA ticket with the exact
description of the problem..

Just to conclude this thread there are 2 strange things I found:

1) Is LONG really not supported yet? If I use as output table LONG,STRING I
get
  Exception in thread "main" java.lang.UnsupportedOperationException:
class org.apache.calcite.sql.SqlIdentifier: LONG
  at org.apache.calcite.util.Util.needToImplement(Util.java:967)

2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?

Best,
Flavio


On Wed, Jul 15, 2020 at 5:28 AM Jark Wu  wrote:

> I think this might be a bug in `tableEnv.fromValues`.
>
> Could you try to remove the DataType parameter, and let the framework
> derive the types?
>
> final Table inputTable = tableEnv.fromValues(
> Row.of(1L, "Hello"), //
> Row.of(2L, "Hello"), //
> Row.of(3L, ""), //
> Row.of(4L, "Ciao"));
>
> Best,
> Jark
>
>
> On Wed, 15 Jul 2020 at 11:19, Leonard Xu  wrote:
>
>> Hi, Flavio
>>
>> I reproduced your issue, and I think it should be a bug. But I’m not sure
>> it comes from Calcite or Flink shaded Calcite, Flink Table Planner module
>> shaded calcite.
>>
>> Maybe Danny can help explain more.
>>
>> CC: Danny
>>
>> Best
>> Leonard Xu
>>
>> 在 2020年7月14日,23:06,Flavio Pompermaier  写道:
>>
>> If I use
>>
>> final Table inputTable = tableEnv.fromValues(
>> DataTypes.ROW(
>> DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
>> DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>> ), ..
>>   tableEnv.executeSql(//
>> "CREATE TABLE `out` (" +
>> "col1 STRING," +
>> "col2 STRING" +
>> ") WITH (...)
>>
>> the job works as expected but this is wrong IMHO
>> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
>> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE
>> should be "STRING NOT NULL" . Am I correct?
>>
>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier 
>> wrote:
>>
>>> Sorry, obviously  " 'format' = 'parquet'" + is without comment :D
>>>
>>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier 
>>> wrote:
>>>
 Hi to all,
 I'm trying to test write to parquet using the following code but I have
 an error:

  final TableEnvironment tableEnv =
 DatalinksExecutionEnvironment.getBatchTableEnv();
 final Table inputTable = tableEnv.fromValues(//
 DataTypes.ROW(//
 DataTypes.FIELD("col1", DataTypes.STRING()), //
 DataTypes.FIELD("col2", DataTypes.STRING())//
 ), //
 Row.of(1L, "Hello"), //
 Row.of(2L, "Hello"), //
 Row.of(3L, ""), //
 Row.of(4L, "Ciao"));
 tableEnv.createTemporaryView("ParquetDataset", inputTable);
 tableEnv.executeSql(//
 "CREATE TABLE `out` (\n" + //
 "col1 STRING,\n" + //
 "col2 STRING\n" + //
 ") WITH (\n" + //
 " 'connector' = 'filesystem',\n" + //
 // " 'format' = 'parquet',\n" + //
 " 'update-mode' = 'append',\n" + //
 " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
 " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
 ")");

 tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
 ParquetDataset");

 -

 Exception in thread "main" java.lang.AssertionError: Conversion to
 relational algebra failed to preserve datatypes:
 validated type:
 RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
 VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
 converted type:
 RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
 VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
 rel:
 LogicalProject(col1=[$0], col2=[$1])
   LogicalUnion(all=[true])
 LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"])
   LogicalValues(tuples=[[{ 0 }]])
 LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"])
   LogicalValues(tuples=[[{ 0 }]])
 LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"])
   LogicalValues(tuples=[[{ 0 }]])
 LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
 "UTF-16LE"])
   

??????????: flink state

2020-07-15 Thread Robert.Zhang
keyed state ??broadcast state??keyed 
statekeybroadcast 
statebroadcast state??key
nonbroadcast side??broadcast state??read-only,
??





----
??: "zhao liang"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com ??2020??7??13?? 9:50??

 Hello,all
 stream
 state keyed streamglobal
 
parameter??statestream
 operator??


 Best regards

flink1.11 sql kafka 抽取事件时间

2020-07-15 Thread 18500348...@163.com
大家好!

使用flink1.11 sql接入kafka ,format为csv
从eventTime字段中抽取事件时间
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd HH:mm:ss'))
eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true', 
那么eventTime会被设置为null,此时会报一个异常:
Caused by: java.lang.RuntimeException: RowTime field should not be null, please 
convert it to a non-null long value.

有没有什么好的方式可以解决


祝好!



18500348...@163.com


?????? flink on yarn????????

2020-07-15 Thread Cayden chen
 
logback??appder??appder(flinktaskmanager??applicationIdjar??appder)??MDC.put(),appIdappder??




----
??: 
   "user-zh"

https://logging.apache.org/log4j/2.x/manual/appenders.html

Best,
Yangze Guo

On Tue, Jul 14, 2020 at 4:46 PM nicygan https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
gt; gt;gt; gt; gt;
gt; gt;gt; gt; gt; Best,
gt; gt;gt; gt; gt; Yangze Guo
gt; gt;gt; gt; gt;
gt; gt;gt; gt; gt;
gt; gt;gt; gt; gt; On Mon, Jul 13, 2020 at 2:40 PM 
 <13162790...@163.comgt; wrote:
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; nbsp; 

gt; gt;gt; gt; gt; gt; 1 
gt; gt;gt; gt; gt; gt; 2  per-job
gt; gt;gt; gt; gt; gt; 3 cdh flink.1.10
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; ?? 2020-07-13 
11:18:46??"Yangze Guo" https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
gt; gt;gt; gt; gt; gt; gt;[2]
gt; gt;gt; gt; gt;
gt; gt;gt; 
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
gt; gt;gt; gt; gt; gt; gt;[3]
gt; gt;gt; gt; gt;
gt; gt;gt; 
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
gt; gt;gt; gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; gt;Best,
gt; gt;gt; gt; gt; gt; gt;Yangze Guo
gt; gt;gt; gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; gt;On Mon, Jul 13, 
2020 at 10:49 AM  <13162790...@163.comgt; wrote:
gt; gt;gt; gt; gt; gt; gt;gt;
gt; gt;gt; gt; gt; gt; gt;gt; 

gt; gt;gt; gt; gt; gt; gt;gt; 1 flink 
on yarn?? taskmanager ??  
gt; gt;gt; gt; gt; ??es 
??taskmanager 
gt; gt;gt; gt; gt; gt; gt;gt; 2 flink 
on yarn ??taskmanager ??jobmanager 
gt; gt;gt; gt; gt; ?? 
??nbsp;nbsp; task  
taskmanager??jobmanager??
gt; gt;gt; gt; gt; gt; gt;gt;
gt; gt;gt; gt; gt;
gt; gt;gt;