Trying to eagerly schedule a task whose inputs are not ready

2020-07-15 文章 jiafu
flink
org.apache.flink.runtime.executiongraph.ExecutionGraphException: Trying to 
eagerly schedule a task whose inputs are not ready (result type: 
PIPELINED_BOUNDED, partition consumable: false, producer state: SCHEDULED, 
producer slot: null).at 
org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:145)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:840)
 at 
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:621)
 at 
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
 at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)  
 at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010)  at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:436)
   at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:637)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:229)
 at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:186)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:96)
   at 
org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:146)
 at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
 at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:633)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.executiongraph.Execution.lambda$releaseAssignedResource$11(Execution.java:1350)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
   at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
  at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1345)
   at 
org.apache.flink.runtime.executiongraph.Execution.finishCancellation(Execution.java:1115)
at 
org.apache.flink.runtime.executiongraph.Execution.completeCancelling(Execution.java:1094)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1628)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:517)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) 
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
   at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)  
 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at 
akka.actor.ActorCell.invoke(ActorCell.scala:495) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)  at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234)at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 文章 Harold.Miao
是在flink-conf.yaml里面配置这个参数吗
execution.checkpointing.interval


godfrey he  于2020年7月16日周四 下午1:37写道:

> 现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
> 你可以配置在flink-conf.yaml里
>
> Harold.Miao  于2020年7月16日周四 下午1:27写道:
>
> > hi flink users
> >
> > 通过sql-client提交sql怎么设置checkpointing.interval?
> > 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> > 谢谢
> >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 文章 godfrey he
现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
你可以配置在flink-conf.yaml里

Harold.Miao  于2020年7月16日周四 下午1:27写道:

> hi flink users
>
> 通过sql-client提交sql怎么设置checkpointing.interval?
> 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> 谢谢
>
>
>
> --
>
> Best Regards,
> Harold Miao
>


[sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 文章 Harold.Miao
hi flink users

通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢



-- 

Best Regards,
Harold Miao


Re: Flink 1.11 submit job timed out

2020-07-15 文章 Congxian Qiu
Hi
   如果没有异常,GC 情况也正常的话,或许可以看一下 pod 的相关日志,如果开启了 HA 也可以看一下 zk 的日志。之前遇到过一次在 Yarn
环境中类似的现象是由于其他原因导致的,通过看 NM 日志以及 zk 日志发现的原因。

Best,
Congxian


SmileSmile  于2020年7月15日周三 下午5:20写道:

> Hi Roc
>
> 该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> On 07/15/2020 17:16, Roc Marshal wrote:
> Hi,SmileSmile.
> 个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
> 希望这对你有帮助。
>
>
> 祝好。
> Roc Marshal
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-15 17:04:18,"SmileSmile"  写道:
> >
> >Hi
> >
> >使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job
> 并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time
> out,作业提交失败。web ui也会卡主无响应。
> >
> >用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
> >
> >
> >部分日志如下:
> >
> >2020-07-15 16:58:46,460 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.32.160.7, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >2020-07-15 16:58:46,460 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.44.224.7, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >2020-07-15 16:58:46,461 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.40.32.9, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> >
> >2020-07-15 16:59:10,236 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The
> heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
> >2020-07-15 16:59:10,236 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager 
> @akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2 for job
> e1554c737e37ed79688a15c746b6e9ef from the resource manager.
> >
> >
> >how to deal with ?
> >
> >
> >beset !
> >
> >| |
> >a511955993
> >|
> >|
> >邮箱:a511955...@163.com
> >|
> >
> >签名由 网易邮箱大师 定制
>


Re: 回复:答复: flink state

2020-07-15 文章 Congxian Qiu
Hi
broadcast state 是无法修改的,如果你还希望进行修改的话,可以使用 zhao liang 的方法,另外如果这个全局 state
不需要维护一致性等的话,同样可以考虑放到外存中(Redis,HBase 等)

Best,
Congxian


zhao liang  于2020年7月15日周三 下午6:05写道:

> Broadcast
> state是无法满足你的要求的,估计你只能像我这样把涉及的state数据融入到数据流中,在算子中针对不同的类型数据做区分了,等于人工维持这个broadcast的流的变化。
>
> 发件人: Robert.Zhang <173603...@qq.com>
> 日期: 星期三, 2020年7月15日 15:22
> 收件人: user-zh , user-zh@flink.apache.org <
> user-zh@flink.apache.org>
> 主题: 回复:答复: flink state
> 是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed
> state把某些满足条件的key存入这个broadcast state,并在其他算子计算的时候使用这个broadcast
> state,比如需要这些key来做
> 文档中提到的nonbroadcast side是无法修改broadcast state的,是read-only,
> 似乎无法直接去实现
>
>
>
>
>
> --原始邮件--
> 发件人: "zhao liang" 发送时间: 2020年7月14日(星期二) 下午4:09
> 收件人: "user-zh" 主题: 答复: flink state
>
>
>
> 我这边有个类似的实现,需要根据维表数据改变stream的处理,自定义了一个source(从MySQL中定时刷维表数据),kafka的stream
> union这个维表数据流,
> 额外增加一个数据类型(维表类型或者事实数据)进行数据的处理,后续算子将这个维表进行不同的处理并存到对应算子的state中。
>
> 发件人: Congxian Qiu  日期: 星期二, 2020年7月14日 14:03
> 收件人: user-zh  主题: Re: flink state
> Hi Robert
>
> Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
> [2] https://cloud.tencent.com/developer/article/1509789
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com 于2020年7月13日周一 下午9:50写道:
>
>  Hello,all
>  目前stream中遇到一个问题,
>  想使用一个全局的state 在所有的keyed stream中使用,或者global
>  parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
>  operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
> 
> 
>  Best regards
>


Re: flink sql 1.11 create hive table error

2020-07-15 文章 Leonard Xu
Hello, Zach

是的, 1.12 会支持,PR[1]已经开了,在review中。

祝好,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18588 

> 在 2020年7月16日,12:07,Zhou Zach  写道:
> 
> Hi all,
> flink sql 1.11 create table 是不是 不支持 IF NOT EXISTS
> 
> 
> Query:
>val hiveConfDir = "/etc/hive/conf" 
>val hiveVersion = "2.1.1"
> 
>val odsCatalog = "odsCatalog"
>val odsHiveCatalog = new HiveCatalog(odsCatalog, "ods", hiveConfDir, 
> hiveVersion)
>streamTableEnv.registerCatalog(odsCatalog, odsHiveCatalog)
> 
>streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>streamTableEnv.executeSql(
>  """
>|
>|CREATE TABLE IF NOT EXISTS odsCatalog.ods.hive_table (
>|  user_id STRING,
>|  age INT
>|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet 
> TBLPROPERTIES (
>|  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>|  'sink.partition-commit.trigger'='partition-time',
>|  'sink.partition-commit.delay'='0s',
>|  'sink.partition-commit.policy.kind'='metastore'
>|)
>|
>|""".stripMargin)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
> ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_161]
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_161]
>   at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_161]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [?:1.8.0_161]
>   at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>  [data-flow-1.0.jar:?]
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>  [data-flow-1.0.jar:?]
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [data-flow-1.0.jar:?]
>   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [data-flow-1.0.jar:?]
> Caused by: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
>   ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error: SQL parse failed. Encountered "NOT" at line 3, 
> column 17.
> Was expecting one of:
> 
>"ROW" ...
>"COMMENT" ...
>"LOCATION" ...
>"PARTITIONED" ...
>"STORED" ...
>"TBLPROPERTIES" ...
>"(" ...
>"." ...
> 
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>  ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>   ... 10 more
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "NOT" at line 3, column 17.
> Was expecting one of:
> 
>"ROW" ...
>"COMMENT" ...
>"LOCATION" ...
>"PARTITIONED" ...
>"STORED" ...
>"TBLPROPERTIES" ...
>"(" ...
>"." ...
> 
>   at 
> 

Re: flink 1.11 upsert结果出错

2020-07-15 文章 Leonard Xu


> 在 2020年7月16日,11:44,小学生 <201782...@qq.com> 写道:
> 
> t_env.execute_sql('''delete from source_tab where trck_id='aew' ''')

你这张表定义的是 Flink 中的表,这张表对应的是你外部系统(MySQL数据库)中的表,Flink 不支持 表上 的DELETE [1], Flink 
是一个计算引擎,
主要场景是读取、写入外部系统,修改外部系统的数据目前只发生在写入(insert)的时候,并且主要是为了保证数据一致性语义,需要往下游系统发Delete消息,
这个delete的消息的处理都是各个connector自己处理的,用户不用显示地调用delete, 你可以参考[2]了解更多。

祝好
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/
 

[2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
 

 

Re: Flink-1.11内置connector测试问题求解

2020-07-15 文章 godfrey he
目前 1.11 版本中的 tableResult.print 只支持 exactly once 语义,需要配置 checkpoint。

1.12 里准备支持 at least once 语义,用户可以不用配置 checkpoint。目前 pr [1] 正在reivew 。

[1] https://github.com/apache/flink/pull/12867

Best,
Godfrey

Jingsong Li  于2020年7月16日周四 上午11:36写道:

>  tableResult.print需要有checkpoint
>
> Best,
> Jingsong
>
> On Thu, Jul 16, 2020 at 11:31 AM amen...@163.com  wrote:
>
> > hi, everyone
> >
> > 小白在测试flink
> >
> 1.11新特性新内置的三个connector时,在本地创建图片[1]中的任务并进行数据打印时,控制台只打印了表schema,而没有按内置的datagen
> > connector规则产生数据,请问可能是什么原因呢?谢谢解答!
> >
> >
> > [1] https://postimg.cc/PprT9XV6
> >
> > best,
> > amenhub
> >
> >
> >
> > amen...@163.com
> >
>
>
> --
> Best, Jingsong Lee
>


flink sql 1.11 create hive table error

2020-07-15 文章 Zhou Zach
Hi all,
flink sql 1.11 create table 是不是 不支持 IF NOT EXISTS


Query:
val hiveConfDir = "/etc/hive/conf" 
val hiveVersion = "2.1.1"

val odsCatalog = "odsCatalog"
val odsHiveCatalog = new HiveCatalog(odsCatalog, "ods", hiveConfDir, 
hiveVersion)
streamTableEnv.registerCatalog(odsCatalog, odsHiveCatalog)

streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
  """
|
|CREATE TABLE IF NOT EXISTS odsCatalog.ods.hive_table (
|  user_id STRING,
|  age INT
|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet 
TBLPROPERTIES (
|  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
|  'sink.partition-commit.trigger'='partition-time',
|  'sink.partition-commit.delay'='0s',
|  'sink.partition-commit.policy.kind'='metastore'
|)
|
|""".stripMargin)












java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_161]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_161]
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
 [data-flow-1.0.jar:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [data-flow-1.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[data-flow-1.0.jar:?]
Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: SQL parse failed. Encountered "NOT" at line 3, column 
17.
Was expecting one of:
 
"ROW" ...
"COMMENT" ...
"LOCATION" ...
"PARTITIONED" ...
"STORED" ...
"TBLPROPERTIES" ...
"(" ...
"." ...

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Encountered "NOT" at line 3, column 17.
Was expecting one of:
 
"ROW" ...
"COMMENT" ...
"LOCATION" ...
"PARTITIONED" ...
"STORED" ...
"TBLPROPERTIES" ...
"(" ...
"." ...

at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) 
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
 

Re: flink 1.11 upsert????????

2020-07-15 文章 ??????
??delete??
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE source_tab (
trck_id VARCHAR,
score INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
sink="""
CREATE TABLE sink_tab (
trck_id VARCHAR,
score INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g_copy', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)


t_env.execute_sql(source)
t_env.execute_sql(sink)


t_env.execute_sql('''delete from source_tab where trck_id='aew' ''')
table_result1=t_env.execute_sql('''insert into sink_tab select * from 
source_tab ''')
table_result1.get_job_client().get_job_execution_result().result()

FlinkKafkaConsumer API 维表关联

2020-07-15 文章 郑斌斌
各位好:

请教一下,用FlinkKafkaConsumer API的话,如何支持SQL的方式,和维表关联。(之前用Kafka 
API API是可以的)
 "select  a.id,b.name from kafka_table a "
+ "join dim_table FOR SYSTEM_TIME AS OF a.proctime as b on a.id = 
b.user_id";

thanks & Regards

Re: flink 1.11 upsert????????

2020-07-15 文章 ??????
??pyflink??

Re: flink 1.11 upsert结果出错

2020-07-15 文章 Xingbo Huang
Hi,
Leonard 说的是对的,除了udf的部分,pyflink的所有的api都是调用的java端的功能,如果java端没有,pyflink就不支持

Best,
Xingbo

Leonard Xu  于2020年7月16日周四 上午11:09写道:

> Hi,
>
> 我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。
>
> 祝好
> Leonard Xu
>
> > 在 2020年7月16日,11:04,小学生 <201782...@qq.com> 写道:
> >
> > 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!
>
>


Re: flink 1.11 upsert结果出错

2020-07-15 文章 Leonard Xu
Hi,

我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。

祝好
Leonard Xu

> 在 2020年7月16日,11:04,小学生 <201782...@qq.com> 写道:
> 
> 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!



Re: flink 1.11 upsert????????

2020-07-15 文章 ??????
??java,pyflink??mysql??delete??

HELP,flink1.10 sql整合hbase,insert into时,提示validateSchemaAndApplyImplicitCast报错

2020-07-15 文章 Jim Chen
Hi,

我在使用flink1.10.1的sql功能,hbase的版本是1.4.3,写入hbase时,提示validateSchemaAndApplyImplicitCast报错,意思是Query的Schema和Sink的Schema不一致。主要是Query
Schema中的Row(EXPR$0),里面都是表达式。Sink
Schema中是Row(device_id)这种。我不知道,在sql中如何写,才能和hbase的sink schema保持一致。

我尝试了,类似于在select device_id as rowkey, ROW( device_id as 这里不能as ) as
f1,不写的话,Query 中ROW的 Schema都是表达式,不是具体定义的一个字段

这里query和sink的字段个数,是对上的。每个字段的类型也是对应上的。就是Query的Schema中是表达式,没法保持一致

报错信息如下:
[image: image.png]

关键代码:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
"  rowkey STRING,\n" +
"  f1 ROW< \n" +
"device_id STRING,\n" +
"pass_id STRING,\n" +
"first_date STRING,\n" +
"first_channel_id STRING,\n" +
"first_app_version STRING,\n" +
"first_server_time STRING,\n" +
"first_server_hour STRING,\n" +
"first_ip_location STRING,\n" +
"first_login_time STRING,\n" +
"sys_can_uninstall STRING,\n" +
"update_date STRING,\n" +
"server_time BIGINT,\n" +
"last_pass_id STRING,\n" +
"last_channel_id STRING,\n" +
"last_app_version STRING,\n" +
"last_date STRING,\n" +
"os STRING,\n" +
"attribution_channel_id STRING,\n" +
"attribution_first_date STRING,\n" +
"p_product STRING,\n" +
"p_project STRING,\n" +
"p_dt STRING\n" +
">\n" +
") WITH (\n" +
"  'connector.type' = 'hbase',\n" +
"  'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
"  'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
"  'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
"  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
"  'connector.write.buffer-flush.max-size' = '2mb',\n" +
"  'connector.write.buffer-flush.max-rows' = '1000',\n" +
"  'connector.write.buffer-flush.interval' = '2s'\n" +
")";

insert into sql:

String bodyAndLocalSql = "" +
//"insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +

// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +

// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +

// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +

// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +

// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +

// first_ip_location
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
 

Re:Re: flink 1.11 sql类型问题

2020-07-15 文章 sunfulin



hi, leonard
感谢回复。我在es的ddl with参数里加了这个,貌似还是报错。我再简单描述下我的场景:
我的es sink的ddl如下:
create table es_sink (
  a varchar,
  b varchar,
  c TIMESTAMP(9) WITH LOCAL TIME ZONE
) with (
  
)


我使用处理时间属性,将流里的proctime转成UTC格式的日期类型,作为c这个字段写入。现在能原生支持么?之前在1.10版本貌似是可以直接写的。但是到1.11写的不带时区了,导致不能兼容之前的格式。














在 2020-07-16 09:40:06,"Leonard Xu"  写道:
>Hello
>
>json解析UTC时间是支持的,你with参数里指定下json中timestamp的类型试下, json.timestamp-format.standard 
>= 'ISO-8601'
>
>Best
>Leonard Xu
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> 
>
>
>> 在 2020年7月15日,23:19,sunfulin  写道:
>> 
>> hi,
>> 我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL 
>> TIME ZONE。
>> 在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
>> 2020-07-15T12:00:00.000Z 
>> 
>> 
>> 
>> java.lang.UnsupportedOperationException: Not support to parse type: 
>> TIMESTAMP(9) WITH LOCAL TIME ZONE
>> 
>> at 
>> org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-15 21:24:30,"sunfulin"  写道:
>>> hi,
>>> 我看1.11的java.sql.Timestamp 
>>> 对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?
>


Re: flink 1.11 sql类型问题

2020-07-15 文章 Leonard Xu
Hello

json解析UTC时间是支持的,你with参数里指定下json中timestamp的类型试下, json.timestamp-format.standard 
= 'ISO-8601'

Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
 


> 在 2020年7月15日,23:19,sunfulin  写道:
> 
> hi,
> 我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL TIME 
> ZONE。
> 在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
> 2020-07-15T12:00:00.000Z 
> 
> 
> 
> java.lang.UnsupportedOperationException: Not support to parse type: 
> TIMESTAMP(9) WITH LOCAL TIME ZONE
> 
> at 
> org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-15 21:24:30,"sunfulin"  写道:
>> hi,
>> 我看1.11的java.sql.Timestamp 
>> 对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?



Re:flink 1.11 sql类型问题

2020-07-15 文章 sunfulin
hi,
我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL TIME 
ZONE。
在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 
2020-07-15T12:00:00.000Z 



java.lang.UnsupportedOperationException: Not support to parse type: 
TIMESTAMP(9) WITH LOCAL TIME ZONE

at 
org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)











在 2020-07-15 21:24:30,"sunfulin"  写道:
>hi,
>我看1.11的java.sql.Timestamp 
>对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?


Re:Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-15 文章 chenxyz



Hello,
Peihui,可以参考下是不是和这个问题类似?之前我在1.10.0也遇到过。
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html#a2239
解决方式:
1. 使用hdfs作为状态后端不会报错
2. 升级至1.10.1使用rocksdb也不会出现该问题














在 2020-07-14 14:41:53,"Peihui He"  写道:
>Hi Yun,
>
>我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
>print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
>里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
>yarn。
>
>Best wishes.
>
>Yun Tang  于2020年7月14日周二 上午11:57写道:
>
>> Hi Peihui
>>
>> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
>> cause。
>>
>> [1]
>> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>>
>>
>> 祝好
>> 唐云
>> 
>> From: Peihui He 
>> Sent: Tuesday, July 14, 2020 10:42
>> To: user-zh@flink.apache.org 
>> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>
>> hello,
>>
>> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>>
>>
>> Caused by: java.nio.file.NoSuchFileException:
>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> ->
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>
>> 配置和1.9.2 一样:
>> state.backend: rocksdb
>> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>> state.savepoints.dir: hdfs:///flink/savepoints/wc/
>> state.backend.incremental: true
>>
>> 代码上都有
>>
>> env.enableCheckpointing(1);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
>> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>>
>>
>>   是1.10.0 需要做什么特别配置么?
>>


flink 1.11 sql类型问题

2020-07-15 文章 sunfulin
hi,
我看1.11的java.sql.Timestamp 
对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?

Re: flink1.11 sql kafka 抽取事件时间

2020-07-15 文章 Benchao Li
我感觉可以通过计算列的方式来解决呀,你只需要在计算rowtime这个列的时候保证它不是null即可,如果是null,可以设置一个默认值之类的?

18500348...@163.com <18500348...@163.com> 于2020年7月15日周三 下午3:04写道:

> 大家好!
>
> 使用flink1.11 sql接入kafka ,format为csv
> 从eventTime字段中抽取事件时间
> rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd
> HH:mm:ss'))
> eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true',
> 那么eventTime会被设置为null,此时会报一个异常:
> Caused by: java.lang.RuntimeException: RowTime field should not be null,
> please convert it to a non-null long value.
>
> 有没有什么好的方式可以解决
>
>
> 祝好!
>
>
>
> 18500348...@163.com
>


-- 

Best,
Benchao Li


答复: 回复:答复: flink state

2020-07-15 文章 zhao liang
Broadcast 
state是无法满足你的要求的,估计你只能像我这样把涉及的state数据融入到数据流中,在算子中针对不同的类型数据做区分了,等于人工维持这个broadcast的流的变化。

发件人: Robert.Zhang <173603...@qq.com>
日期: 星期三, 2020年7月15日 15:22
收件人: user-zh , user-zh@flink.apache.org 

主题: 回复:答复: flink state
是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed 
state把某些满足条件的key存入这个broadcast state,并在其他算子计算的时候使用这个broadcast state,比如需要这些key来做
文档中提到的nonbroadcast side是无法修改broadcast state的,是read-only,
似乎无法直接去实现





--原始邮件--
发件人: "zhao liang"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com 于2020年7月13日周一 下午9:50写道:

 Hello,all
 目前stream中遇到一个问题,
 想使用一个全局的state 在所有的keyed stream中使用,或者global
 parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
 operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽


 Best regards


springboot 2.3.1 + flink 1.11.0整合后如何从外部传入application.yml配置文件?

2020-07-15 文章 vw17
Hi,
由于项目需要目前整合了springboot和flink,
但一些项目的相关配置在application.yml,生产环境需要变更其中的一些配置。之前通常的做法的是启动时使用
-Dspring.config.location=xxx 从外部指定需要的配置文件。现在想知道如果使用flink run
启动jar能否支持这种指定方式? 
谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? pyflink1.11.0window

2020-07-15 文章 ??????????????
  ??




----
??: 
   "user-zh"



Re: pyflink1.11.0window

2020-07-15 文章 Shuiqiang Chen
下面这个例子从kafka读取json格式的数据, 然后做窗口聚合后写入es, 可以参考下代码结构, 修改相应数据字段。 这份代码你本地应该是不能运行的

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.INT()], result_type=DataTypes.STRING())
def platform_code_to_name(code):
return "mobile" if code == 0 else "pc"


def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)

source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
rt as TO_TIMESTAMP(createTime),
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
paySource INT,
WATERMARK FOR rt as rt - INTERVAL '2' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'payment_msg_2',
  'connector.properties.bootstrap.servers' = '0.0.0.0:9092',
  'connector.properties.group.id' = 'test_3',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""
t_env.sql_update(source_ddl)

es_sink_ddl = """
CREATE TABLE es_sink (
platform VARCHAR,
pay_amount DOUBLE,
rowtime TIMESTAMP(3)
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'platform_pay_amount_1',
'connector.document-type' = 'payment',
'update-mode' = 'upsert',
'connector.flush-on-checkpoint' = 'true',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.max-size' = '42mb',
'connector.bulk-flush.max-actions' = '32',
'connector.bulk-flush.interval' = '1000',
'connector.bulk-flush.backoff.delay' = '1000',
'format.type' = 'json'
)
"""

t_env.sql_update(es_sink_ddl)

t_env.register_function('platformcodetoname', platform_code_to_name)

query = """
select platformcodetoname(payPlatform) as platform, sum(payAmount)
as pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT)
as rowtime
from payment_msg
group by tumble(rt, interval '5' seconds), payPlatform
"""

count_result = t_env.sql_query(query)

t_env.create_temporary_view('windowed_values', count_result)

query2 = """
select platform, last_value(pay_amount), rowtime from
windowed_values group by platform, rowtime
"""

final_result = t_env.sql_query(query2)

final_result.execute_insert(table_path='es_sink')


if __name__ == '__main__':
log_processing()


奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月15日周三 下午4:40写道:

> Shuiqiang,你好:
>  
> hi,能否请求您贡献一下完整的代码的案例,我是初学者,官网的2-from_kafka_to_kafka.py这个没有窗口,我现在想要一个在此基础上有窗口的demo,尝试编了很久也未能解决。我在给这个demo加上窗口功能后总是有各种各样的问题,十分痛苦,如能帮助,感激不尽。
>
>
> 恳请所有看到此封邮件的大佬!
>
>
> 谢谢!
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> acqua@gmail.com;
> 发送时间:2020年7月15日(星期三) 中午11:25
> 收件人:"user-zh"
> 主题:Re: pyflink1.11.0window
>
>
>
> 举个sql例子
> select platformcodetoname(payPlatform) as platform, sum(payAmount) as
> pay_amount, cast(tumble_start(rt, interval '5' seconds) as BIGINT) as
> rowtime
> from payment_msg group by tumble(rt, interval '5' seconds), payPlatform
> 这个query 对每5s的tumble窗口做统计。
>
> 奇怪的不朽琴师 <1129656...@qq.com 于2020年7月15日周三 上午11:10写道:
>
>  Shuiqiang,你好:
>  nbsp;
> nbsp;我的目的是每间隔一段时间做一次汇总统计,比如每两秒做一下汇总,请问这个需求我改如何定义window?
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  acqua@gmail.comgt;;
>  发送时间:nbsp;2020年7月15日(星期三) 上午10:51
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: pyflink1.11.0window
> 
> 
> 
>  琴师你好,
>  异常栈信息org.apache.flink.table.api.ValidationException: A tumble window
>  expects a size value literal.
>  看起来是接下tumble window定义的代码不太正确吧
> 
>  Best,
>  Shuiqiang
> 
>  奇怪的不朽琴师 <1129656...@qq.comgt; 于2020年7月15日周三 上午10:27写道:
> 
>  gt; 你好:
>  gt; amp;nbsp; amp;nbsp;
>  gt;
> 
> amp;nbsp;我按着你回复的建议改了source但是会报新的错误,请问这个是因为什么?我想调试一个window一直没有成功,请帮帮我,谢谢。
>  gt; Traceback (most recent call last):
>  gt; amp;nbsp; File "tou.py", line 71, in   gt; amp;nbsp; amp;nbsp; from_kafka_to_kafka_demo()
>  gt; amp;nbsp; File "tou.py", line 21, in
> from_kafka_to_kafka_demo
>  gt; 

Re: Flink 1.11 submit job timed out

2020-07-15 文章 SmileSmile
Hi Roc

该现象在1.10.1版本没有,在1.11版本才出现。请问这个该如何查比较合适



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/15/2020 17:16, Roc Marshal wrote:
Hi,SmileSmile.
个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
希望这对你有帮助。


祝好。
Roc Marshal











在 2020-07-15 17:04:18,"SmileSmile"  写道:
>
>Hi
>
>使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
>并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
>out,作业提交失败。web ui也会卡主无响应。
>
>用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
>
>
>部分日志如下:
>
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.32.160.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.44.224.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,461 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.40.32.9, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
>heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
>Disconnect job manager 
>0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
> for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.
>
>
>how to deal with ?
>
>
>beset !
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re:Flink 1.11 submit job timed out

2020-07-15 文章 Roc Marshal
Hi,SmileSmile.
个人之前有遇到过 类似 的host解析问题,可以从k8s的pod节点网络映射角度排查一下。
希望这对你有帮助。


祝好。
Roc Marshal











在 2020-07-15 17:04:18,"SmileSmile"  写道:
>
>Hi
>
>使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
>并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
>out,作业提交失败。web ui也会卡主无响应。
>
>用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。
>
>
>部分日志如下:
>
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.32.160.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,460 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.44.224.7, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>2020-07-15 16:58:46,461 WARN  
>org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
>could be resolved for the IP address 10.40.32.9, using IP address as host 
>name. Local input split assignment (such as for HDFS files) may be impacted.
>
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
>heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
>2020-07-15 16:59:10,236 INFO  
>org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
>Disconnect job manager 
>0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
> for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.
>
>
>how to deal with ?
>
>
>beset !
>
>| |
>a511955993
>|
>|
>邮箱:a511955...@163.com
>|
>
>签名由 网易邮箱大师 定制


Re:【求助】Flink Hadoop依赖问题

2020-07-15 文章 Roc Marshal



你好,Z-Z,

可以尝试在 
https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/ 
下载对应的uber jar包,并就将下载后的jar文件放到flink镜像的 ${FLINK_HOME}/lib 路径下,之后启动编排的容器。
祝好。
Roc Marshal.











在 2020-07-15 10:47:39,"Z-Z"  写道:
>我在使用Flink 1.11.0版本中,使用docker-compose搭建,docker-compose文件如下:
>version: "2.1"
>services:
> jobmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6123"
>  ports:
>   - "8081:8081"
>  command: jobmanager
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>   - 
>HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
>  volumes:
>   - ./jobmanager/conf:/opt/flink/conf
>   - ./data:/data
>
>
> taskmanager:
>  image: flink:1.11.0-scala_2.12
>  expose:
>   - "6121"
>   - "6122"
>  depends_on:
>   - jobmanager
>  command: taskmanager
>  links:
>   - "jobmanager:jobmanager"
>  environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>  volumes:
>   - ./taskmanager/conf:/opt/flink/conf
>networks:
> default:
>  external:
>   name: flink-network
>
>
>
>hadoop-2.9.2已经放在data目录了,且已经在jobmanager和taskmanager的环境变量里添加了HADOOP_CLASSPATH,但通过cli提交和webui提交,jobmanager还是提示报Could
> not find a file system implementation for scheme 'hdfs'。有谁知道是怎么回事吗?


Flink 1.11 submit job timed out

2020-07-15 文章 SmileSmile

Hi

使用版本Flink 1.11,部署方式 kubernetes session。 TM个数30个,每个TM 4个slot。 job 
并行度120.提交作业的时候出现大量的No hostname could be resolved for the IP address,JM time 
out,作业提交失败。web ui也会卡主无响应。

用wordCount,并行度只有1提交也会刷,no hostname的日志会刷个几条,然后正常提交,如果并行度一上去,就会超时。


部分日志如下:

2020-07-15 16:58:46,460 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
could be resolved for the IP address 10.32.160.7, using IP address as host 
name. Local input split assignment (such as for HDFS files) may be impacted.
2020-07-15 16:58:46,460 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
could be resolved for the IP address 10.44.224.7, using IP address as host 
name. Local input split assignment (such as for HDFS files) may be impacted.
2020-07-15 16:58:46,461 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
could be resolved for the IP address 10.40.32.9, using IP address as host name. 
Local input split assignment (such as for HDFS files) may be impacted.

2020-07-15 16:59:10,236 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - The 
heartbeat of JobManager with id 69a0d460de46a9f41c770d963c0a timed out.
2020-07-15 16:59:10,236 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2
 for job e1554c737e37ed79688a15c746b6e9ef from the resource manager.


how to deal with ?


beset !

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

回复: FlinkSQL 入到 MySQL后汉字乱码

2020-07-15 文章 wangl...@geekplus.com.cn

是 MySQL_tableB 所在的 server 端字符设置有问题。
配置中加上下面的配置就好了。


[mysqld] character-set-server=utf8 [client] default-character-set=utf8 [mysql] 
default-character-set=utf8




wangl...@geekplus.com.cn 

发件人: wangl...@geekplus.com.cn
发送时间: 2020-07-15 16:34
收件人: user-zh
主题: FlinkSQL 入到 MySQL后汉字乱码
 
KafkaTable:kafka 消息
MySQL_tableA:  维表,维表里 value 是汉字
MySQL_tableB:  join后的结果表。和 MySQL_tableA 不在同一台服务器上。
 
我直接在 flink sql client   SELECT 是可以正常显示, 但 INSERT INTO MySQL_tableB SELECT 后到 
MySQL_tableB 里去查看,汉字就乱码了。
大家有什么建议吗?
 
谢谢,
王磊
 
 
 
wangl...@geekplus.com.cn 


Re: flink 1.11 自定义RichFlatMapFunction中使用JdbcRowDataOutputFormat 写pgsql数据问题,RuntimeContext初始化问题,空指针或RuntimeContext未初始化,哪里用的不对!

2020-07-15 文章 jindy_liu
确实是这行导致的,
如果都重构了,那应该怎么用较好的?
我需要知道每一行对应的是insert, update还是delete事件。
或者问题变一下,对于这种api,一般遵守什么规则,flink的版本兼容性会更好?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? pyflink1.11.0window

2020-07-15 文章 ??????????????
Shuiqiang
  
hi2-from_kafka_to_kafka.py??demodemo??





??




----
??: 
   "user-zh"



Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-15 文章 Robin Zhang
据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑

Best
Robin Zhang

From: Peihui He <[hidden email]>
Sent: Tuesday, July 14, 2020 10:42
To: [hidden email] <[hidden email]>
Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

hello,

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示


Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
->
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

配置和1.9.2 一样:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
state.savepoints.dir: hdfs:///flink/savepoints/wc/
state.backend.incremental: true

代码上都有

env.enableCheckpointing(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));


  是1.10.0 需要做什么特别配置么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????????: flink state

2020-07-15 文章 Robert.Zhang
keyed state ??broadcast state??keyed 
statekeybroadcast 
statebroadcast state??key
nonbroadcast side??broadcast state??read-only,
??





----
??: "zhao liang"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com ??2020??7??13?? 9:50??

 Hello,all
 stream
 state keyed streamglobal
 
parameter??statestream
 operator??


 Best regards

flink1.11 sql kafka 抽取事件时间

2020-07-15 文章 18500348...@163.com
大家好!

使用flink1.11 sql接入kafka ,format为csv
从eventTime字段中抽取事件时间
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, '-MM-dd HH:mm:ss'))
eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true', 
那么eventTime会被设置为null,此时会报一个异常:
Caused by: java.lang.RuntimeException: RowTime field should not be null, please 
convert it to a non-null long value.

有没有什么好的方式可以解决


祝好!



18500348...@163.com


?????? flink on yarn????????

2020-07-15 文章 Cayden chen
 
logback??appder??appder(flinktaskmanager??applicationIdjar??appder)??MDC.put(),appIdappder??




----
??: 
   "user-zh"

https://logging.apache.org/log4j/2.x/manual/appenders.html

Best,
Yangze Guo

On Tue, Jul 14, 2020 at 4:46 PM nicygan https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
gt; gt;gt; gt; gt;
gt; gt;gt; gt; gt; Best,
gt; gt;gt; gt; gt; Yangze Guo
gt; gt;gt; gt; gt;
gt; gt;gt; gt; gt;
gt; gt;gt; gt; gt; On Mon, Jul 13, 2020 at 2:40 PM 
 <13162790...@163.comgt; wrote:
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; nbsp; 

gt; gt;gt; gt; gt; gt; 1 
gt; gt;gt; gt; gt; gt; 2  per-job
gt; gt;gt; gt; gt; gt; 3 cdh flink.1.10
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; ?? 2020-07-13 
11:18:46??"Yangze Guo" https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
gt; gt;gt; gt; gt; gt; gt;[2]
gt; gt;gt; gt; gt;
gt; gt;gt; 
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
gt; gt;gt; gt; gt; gt; gt;[3]
gt; gt;gt; gt; gt;
gt; gt;gt; 
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
gt; gt;gt; gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; gt;Best,
gt; gt;gt; gt; gt; gt; gt;Yangze Guo
gt; gt;gt; gt; gt; gt; gt;
gt; gt;gt; gt; gt; gt; gt;On Mon, Jul 13, 
2020 at 10:49 AM  <13162790...@163.comgt; wrote:
gt; gt;gt; gt; gt; gt; gt;gt;
gt; gt;gt; gt; gt; gt; gt;gt; 

gt; gt;gt; gt; gt; gt; gt;gt; 1 flink 
on yarn?? taskmanager ??  
gt; gt;gt; gt; gt; ??es 
??taskmanager 
gt; gt;gt; gt; gt; gt; gt;gt; 2 flink 
on yarn ??taskmanager ??jobmanager 
gt; gt;gt; gt; gt; ?? 
??nbsp;nbsp; task  
taskmanager??jobmanager??
gt; gt;gt; gt; gt; gt; gt;gt;
gt; gt;gt; gt; gt;
gt; gt;gt;