Re:Re: flink sql 消费kafka失败

2020-06-09 文章 Zhou Zach
感谢回复,写入Kafka的时间戳改成"2020-06-10T12:12:43Z",消费成功了

















在 2020-06-10 13:25:01,"Leonard Xu"  写道:
>Hi,
>
>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>
>报错信息说了是 json 解析失败了,按照之前大家踩的坑,请检查下两点:
>(1)json 中timestamp数据的格式必须是"2020-06-10T12:12:43Z", 不能是 long 
>型的毫秒,社区已有issue跟进,还未解决
>(2)kafka 对应topic 检查下是否有脏数据,“earliest-offset’” 会从topic的第一条数据开始消费
>
>祝好
>Leonard Xu


Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 文章 Zhefu PENG
Hi Yichao,

感谢你的回复。因为这个任务已经上线大概一周了,今天才报出这个问题,我们后面会增大间隔并测试。同时,我在刚刚也有回复,我在TM也查到了一些相关日志:
2020-06-10 12:44:40,688 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Pending record count must be zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be
zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more

是不是还是和checkpoint的设立间隔过短有关呢?希望回复,感谢!

Best,
Zhefu

Yichao Yang <1048262...@qq.com> 于2020年6月10日周三 下午1:24写道:

> Hi
>
>
>
> 看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。
>
>
> Best,
> Yichao Yang
>
>
> -- 原始邮件 --
> 发件人: Zhefu PENG  发送时间: 2020年6月10日 13:04
> 收件人: user-zh  主题: 回复:flink任务checkpoint无法完成snapshot,且报kafka异常
>
>
>
> Hi all,
>
> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> Source: Custom Source - Map - Source_Map - Empty_Filer -
> Field_Filter
> - Type_Filter - Value_Filter - Map - Map - Map -
> Sink: Unnamed
>
>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>
> 部分报错信息如下:
> 2020-06-10 12:02:49,083 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering
> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> 2020-06-10 12:04:47,898 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Decline
> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> c41f4811262db1c4c270b136571c8201 at
> container_e27_1591466310139_21670_01_06 @
> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> 2020-06-10 12:04:47,899 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Discarding
> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
> snapshot 1 for operator Source: Custom Source - Map - Source_Map
> -
> Empty_Filer - Field_Filter - Type_Filter - Value_Filter -
> Map - Map -
> Map - Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
>
> 

Re: flink sql 消费kafka失败

2020-06-09 文章 Leonard Xu
Hi,

> Caused by: java.io.IOException: Failed to deserialize JSON object.

报错信息说了是 json 解析失败了,按照之前大家踩的坑,请检查下两点:
(1)json 中timestamp数据的格式必须是"2020-06-10T12:12:43Z", 不能是 long 型的毫秒,社区已有issue跟进,还未解决
(2)kafka 对应topic 检查下是否有脏数据,“earliest-offset’” 会从topic的第一条数据开始消费

祝好
Leonard Xu

回复:flink sql 消费kafka失败

2020-06-09 文章 Yichao Yang
Hi


看报错应该是kafKa有脏数据。


Best,
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: Zhou Zach 

回复:flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 文章 Yichao Yang
Hi


看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。


Best,
Yichao Yang


-- 原始邮件 --
发件人: Zhefu PENG 

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 文章 Zhefu PENG
补充一下,在TaskManager发现了如下错误日志:

2020-06-10 12:44:40,688 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Pending record count must be zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be
zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more

希望得到帮助,感谢!


Zhefu PENG  于2020年6月10日周三 下午1:03写道:

> Hi all,
>
> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
>
>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>
> 部分报错信息如下:
> 2020-06-10 12:02:49,083 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> 2020-06-10 12:04:47,898 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> c41f4811262db1c4c270b136571c8201 at
> container_e27_1591466310139_21670_01_06 @
> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> 2020-06-10 12:04:47,899 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Map -> Source_Map
> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map
> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: The server disconnected before a response was 

Re: 关于flinksql between问题

2020-06-09 文章 Leonard Xu
Hi,

看你描述的想要的是自定义source(左表),  需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular 
join, 维表join的语法[1]:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency
另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 
参数控制维表中cache的过期时间,不知道是否满足你的需求。

Best,
Leonard Xu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 


> 在 2020年6月10日,10:43,小屁孩 <932460...@qq.com> 写道:
> 
> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
> 
> 
> 
> 
> --原始邮件--
> 发件人:"wangweigu...@stevegame.cn" 发送时间:2020年6月9日(星期二) 晚上6:35
> 收件人:"user-zh" 
> 主题:回复: 回复: 关于flinksql between问题
> 
> 
> 
> 
>  我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
>  
> 会报你下面的错误:
>  Exception in thread "main" org.apache.flink.table.api.TableException: 
> Cannot generate a valid execution plan for the given query: 
> 
> LogicalProject(num=[$0])
>  LogicalJoin(condition=[AND(($0, $1), <($0, $2))], joinType=[inner])
>  FlinkLogicalDataStreamScan(id=[1], fields=[num])
>  FlinkLogicalDataStreamScan(id=[2], fields=[startNum, 
> endNum])
> 
> This exception indicates that the query uses an unsupported SQL feature.
> 
> 
> 
> 
> 
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是nbsp;
> 1 flink1.9.0
> 2 oldplanner
>  
> 
>  
> 3 streaming mode
> 4. 代码类似如下
> nbsp; nbsp; val sqlStream = env.createInput(jdbcInput)
> nbsp; nbsp; 
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> nbsp; nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //nbsp; nbsp; val table = tnv.sqlQuery("select * fromnbsp; 
> OMstream asnbsp; a left join sqlStream asnbsp; b on a.ip 
> gt;b.start_ip and a.ip nbsp; nbsp; val table = tnv.sqlQuery("select b.netstruct_id 
> fromnbsp; OMstream asnbsp; a left join sqlStream as b on a.ip 
> gt; b.start_ip and a.ip  nbsp; nbsp; val resRow = table.toRetractStream[Row]
> 
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
> generate a valid execution plan for the given query:nbsp;
> 
> 
> LogicalProject(netstruct_id=[$1])
> nbsp; LogicalJoin(condition=[AND(gt;($0, $2), <($0, $3))], 
> joinType=[left])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[2], 
> fields=[netstruct_id, start_ip, end_ip])
> 
> 
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> at 
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at 
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at 
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at 
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at 
> org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at 
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> 
> 
> 
> 
> 
> 6 我也尝试使用了nbsp;
> select b.netstruct_id fromnbsp; OMstream asnbsp; a left join 
> sqlStream as b on a.ip gt; b.start_ip
> 

flink sql 消费kafka失败

2020-06-09 文章 Zhou Zach




Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
994bd5a683143be23a23d77ed005d20d)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at org.rabbit.sql.FromKafkaSinkMysql$.main(FromKafkaSinkMysql.scala:66)
at org.rabbit.sql.FromKafkaSinkMysql.main(FromKafkaSinkMysql.scala)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 994bd5a683143be23a23d77ed005d20d)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 31 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown 

flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 文章 Zhefu PENG
Hi all,

现在有一个简单的flink任务,大概chain在一起后的执行图为:
Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
-> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed

但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。

部分报错信息如下:
2020-06-10 12:02:49,083 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
2020-06-10 12:04:47,898 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
c41f4811262db1c4c270b136571c8201 at
container_e27_1591466310139_21670_01_06 @
hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
2020-06-10 12:04:47,899 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 1 for operator Source: Custom Source -> Map -> Source_Map ->
Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map ->
Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: The server disconnected before a response was
received.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:973)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:978)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotStat(AbstractStreamOperator.java:402)
... 18 more
Caused by: org.apache.kafka.common.errors.NetworkException: The server
disconnected before a response was received.
2020-06-10 12:04:47,913 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at

?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 文章 kcz
tks




----
??:"Yichao Yang"<1048262...@qq.com;
:2020??6??10??(??) 11:32
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
amp;gt;
amp;gt; 
amp;gt;
amp;gt; ?S 

?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 文章 Yichao Yang
Hi


Flink sql 
??uidsql??datastream 
api


Best,
Yichao Yang




----
??:"kcz"<573693...@qq.com;
:2020??6??10??(??) 11:27
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
gt;
gt; 
gt;
gt; ?S 

?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 文章 kcz
sql 
operatorID??ID




----
??:"??"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

 

 ?S 

Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 文章 方盛凯
我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯  于2020年6月9日周二 下午9:26写道:

>
> 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
> 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
>
> 如有错误,欢迎补充回答。
>
> 陈赋赟  于2020年6月8日周一 上午11:53写道:
>
>> 原先sql任务是:
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>>
>>
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> CREATE TABLE C_source(...)
>> CREATE TABLE D_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>>
>>
>> INSERT INTO C_sink
>> SELECT
>>  1
>> FROM
>> D_source
>> ;
>> 并基于Savepoint提交,结果显示
>>
>> Cannot map checkpoint/savepoint state for operator
>> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
>> is not available in the new program.
>> If you want to allow to skip this, you can set the
>> --allowNonRestoredState option on the CLI.
>>
>>
>> 想请教一下底层是因为什么原因导致了opertor匹配不上?
>
>


Re: 回复: 关于flinksql between问题

2020-06-09 文章 Benchao Li
你的意思是你的mysql维表是自定义的,然后是定期更新的维表内容是么?只要你实现的是LookupSource,应该是没问题的。
内部实现你可以自己控制。

小屁孩 <932460...@qq.com> 于2020年6月10日周三 上午10:46写道:

> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表
> 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>
>
>
>
> --原始邮件--
> 发件人:"wangweigu...@stevegame.cn" 发送时间:2020年6月9日(星期二) 晚上6:35
> 收件人:"user-zh"
> 主题:回复: 回复: 关于flinksql between问题
>
>
>
>
>  我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
> 
> 会报你下面的错误:
>  Exception in thread "main"
> org.apache.flink.table.api.TableException: Cannot generate a valid
> execution plan for the given query:
>
> LogicalProject(num=[$0])
>  LogicalJoin(condition=[AND(($0, $1), <($0, $2))],
> joinType=[inner])
>  FlinkLogicalDataStreamScan(id=[1], fields=[num])
>  FlinkLogicalDataStreamScan(id=[2], fields=[startNum,
> endNum])
>
> This exception indicates that the query uses an unsupported SQL feature.
>
>
>
>
> 
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是nbsp;
> 1 flink1.9.0
> 2 oldplanner
>  
> 
>  
> 3 streaming mode
> 4. 代码类似如下
> nbsp; nbsp; val sqlStream = env.createInput(jdbcInput)
> nbsp; nbsp;
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> nbsp; nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //nbsp; nbsp; val table = tnv.sqlQuery("select * fromnbsp;
> OMstream asnbsp; a left join sqlStream asnbsp; b on a.ip
> gt;b.start_ip and a.ip nbsp; nbsp; val table = tnv.sqlQuery("select b.netstruct_id
> fromnbsp; OMstream asnbsp; a left join sqlStream as b on a.ip
> gt; b.start_ip and a.ip  nbsp; nbsp; val resRow = table.toRetractStream[Row]
> 
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:nbsp;
> 
> 
> LogicalProject(netstruct_id=[$1])
> nbsp; LogicalJoin(condition=[AND(gt;($0, $2), <($0, $3))],
> joinType=[left])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[2],
> fields=[netstruct_id, start_ip, end_ip])
> 
> 
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at org.apache.flink.table.planner.StreamPlanner.org
> $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> 
> 
> 
> 
> 
> 6 我也尝试使用了nbsp;
> select b.netstruct_id fromnbsp; OMstream asnbsp; a left join
> sqlStream as b on a.ip gt; b.start_ip
> 同样是单个大小比较也是不可以的nbsp;
> 
> 
> 谢谢!
> 
> 
> 
> 
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"Benchao Li" 发送时间:nbsp;2020年6月9日(星期二) 下午4:37
> 收件人:nbsp;"user-zh" 
> 主题:nbsp;Re: 关于flinksql between问题
> 
> 
> 
> 方便补充一下以下信息么?
> 1. 你使用的Flink的版本?
> 2. 使用的planner,是blink planner还是old planner?
> 3. 用的是streaming mode还是batch mode?
> 4. 具体的报错信息是什么?
> 
> 小屁孩 <932460...@qq.comgt; 于2020年6月9日周二 下午4:26写道:
> 
> gt; hi,我在flinksql中使用 select * from a join b on a.ip  a.ip
> gt; amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用


?????? ?????? ????flinksql between????

2020-06-09 文章 ??????
hi, ?? ??joinmysql 
sourcemysql




----
??:"wangweigu...@stevegame.cn"

Re: 流groupby

2020-06-09 文章 Benchao Li
不用窗口的group by,一般推荐设置上state retention时间[1]。默认是不会做状态清理的,所以时间长了状态就会特别多。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

李奇 <359502...@qq.com> 于2020年6月10日周三 上午8:40写道:

> 可以不设置窗口,直接用你的字段,我们就有很多没有窗口的业务场景,但是这样会比较耗内存。如果时间太长,也可能导致oom
>
> > 在 2020年6月9日,下午12:24,allanqinjy  写道:
> >
> > hi,
> >   也就是指定 update-model retract就可以了是吧?好的多谢,我试试!
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >> 在 2020-06-09 12:13:10,"1048262223" <1048262...@qq.com> 写道:
> >> Hi
> >>
> >>
> >> 可以不开窗口只不过结果是retract流而不是append流
> >>
> >>
> >> Best,
> >> Yichao Yang
> >>
> >>
> >>
> >>
> >>
> >> 发自我的iPhone
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: allanqinjy  >> 发送时间: 2020年6月9日 12:11
> >> 收件人: user-zh  >> 主题: 回复:流groupby
>
>


Re: flink如何传递全局变量

2020-06-09 文章 Px New
对
正如 -> 1048262223  所说的一样 , 目前我就是通过BroadCast 动态更细一些规则带到下游并在Process method 中
进行操作 | 

zjfpla...@hotmail.com  于2020年6月9日周二 下午8:14写道:

> hi,
> 请问flink如何传递全局变量,静态类好像服务器端运行不行。
> 场景是:一开始flink程序起来时,读取配置文件中的配置项,此类配置项可能会在sink,source等等其他地方用到,算是整个程序的全局配置
>
>
>
> zjfpla...@hotmail.com
>


Re: Flink sql 中 无法通过 TableEnvironment 调出 createTemporaryTable()方法 以及.TableException: findAndCreateTableSource failed 异常

2020-06-09 文章 Px New
Hi *Benchao Li*   Thanks ,你说的很对 我现在已经走在了sql的实践道路上(还好有你指出)

Benchao Li  于2020年6月9日周二 上午10:05写道:

> Hi,
> 我看你用的是1.9.1版本,但是createTemporaryTable应该是在1.10之后才引入的。不知道你参考的是哪一版的文档呢?
>
> Px New <15701181132mr@gmail.com> 于2020年6月8日周一 下午10:00写道:
>
> > Hi 社区:  关于flink sql 使用上的一个问题以及一个sql异常
> > 
> > 我通过官网给出的结构编写代码时发现注册临时表方法无法被调用?[图1, 图 2, 图 3]
> > 通过 tableEnvironment 调用createTemporaryTable 方法
> >
> > 我排查过。 但还是没能解决
> > 1:包倒入的是官网所声明的包。
> > 2:类倒入的是 flink.table.api.TableEnvironment/以及.java.StreamTableEnvironment
> 两个类
> >
> >
> >  图 1 (依赖导入):
> > https://imgkr.cn-bj.ufileos.com/941dbd86-34f4-4579-a53d-f7c04439d6f0.PNG
> >  图 2 (import *):
> > https://imgkr.cn-bj.ufileos.com/e19b90f7-ef60-42d8-a93e-d65c4269e053.png
> >  图 3 (无法调用?):
> > https://imgkr.cn-bj.ufileos.com/579e5336-503c-490f-83b9-ebb46bd1b568.png
> >  图 4 (官网格式):
> > https://imgkr.cn-bj.ufileos.com/fcc7365e-aecf-49a3-94e1-ae64ee67122e.png
> > 图5 (TableException: findAndCreateTableSource failed 异常)
> > https://imgkr.cn-bj.ufileos.com/386a783a-a42e-4466-97f8-540adb524be0.PNG
> > 以及:
> > https://imgkr.cn-bj.ufileos.com/9cac88cc-7601-4b03-ade1-f2432c84adac.PNG
> >
>
>
> --
>
> Best,
> Benchao Li
>


?????? ??????flink????????????????

2020-06-09 文章 1048262223
Hi


Broadcast[1]??open??open??




[1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html


Best,
Yichao Yang


----
??:"zjfpla...@hotmail.com"

????: ??????flink????????????????

2020-06-09 文章 zjfpla...@hotmail.com
??



zjfpla...@hotmail.com
 
 1048262223
?? 2020-06-09 20:17
 user-zh
?? ??flink
Hi
 
 
rich functionopenbroadcast??
 
 
Best,
Yichao Yang
 
 
 
 
----
??:"zjfpla...@hotmail.com"

Re: 流groupby

2020-06-09 文章 李奇
可以不设置窗口,直接用你的字段,我们就有很多没有窗口的业务场景,但是这样会比较耗内存。如果时间太长,也可能导致oom

> 在 2020年6月9日,下午12:24,allanqinjy  写道:
> 
> hi,
>   也就是指定 update-model retract就可以了是吧?好的多谢,我试试!
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2020-06-09 12:13:10,"1048262223" <1048262...@qq.com> 写道:
>> Hi
>> 
>> 
>> 可以不开窗口只不过结果是retract流而不是append流
>> 
>> 
>> Best,
>> Yichao Yang
>> 
>> 
>> 
>> 
>> 
>> 发自我的iPhone
>> 
>> 
>> -- 原始邮件 --
>> 发件人: allanqinjy > 发送时间: 2020年6月9日 12:11
>> 收件人: user-zh > 主题: 回复:流groupby



Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 tison
噢,那应该就是上面说的问题了

你的 Dispatcher 能被发现说明一开始选主和发布是 ok 的,你可以贴一下 HA
的配置,看看有没特别不靠谱的,然后去日志里找一下丢 leadership 的日志,一般来说前后会有一堆 zk 链接 ConnectionLoss 或者
SessionExpire 的日志

Best,
tison.


whirly  于2020年6月9日周二 下午9:23写道:

> Flink 1.8
>
>
>
>
> | |
> whirly
> |
> |
> 邮箱:whir...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月09日 21:15,tison 写道:
> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
> > 大家好:
> > 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
> >
> >
> > 异常信息:
> > Internal server error.,
> >  > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
> >  Fencing token not set: Ignoring message
> > LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> > LocalRpcInvocation(requestMultipleJobDetails(Time)))
> > sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
> the
> > fencing token is null.
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> > at
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > End of exception on server side>
>


Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 文章 方盛凯
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。

陈赋赟  于2020年6月8日周一 上午11:53写道:

> 原先sql任务是:
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>
>
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> CREATE TABLE C_source(...)
> CREATE TABLE D_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
>
>
> INSERT INTO C_sink
> SELECT
>  1
> FROM
> D_source
> ;
> 并基于Savepoint提交,结果显示
>
> Cannot map checkpoint/savepoint state for operator
> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
> is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
>
>
> 想请教一下底层是因为什么原因导致了opertor匹配不上?


回复:flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 whirly
Flink 1.8




| |
whirly
|
|
邮箱:whir...@163.com
|

签名由 网易邮箱大师 定制

在2020年06月09日 21:15,tison 写道:
啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的

Best,
tison.


whirly  于2020年6月9日周二 下午8:58写道:

> 大家好:
> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>
>
> 异常信息:
> Internal server error.,
>  side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>  Fencing token not set: Ignoring message
> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> LocalRpcInvocation(requestMultipleJobDetails(Time)))
> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because the
> fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 tison
你可以详细说一下场景,这个我想了一下应该是你选举窗口太长了

0. 某个时候,Dispatcher 选出了 Leader 并发布自己的地址
1. 某个组件向 Dispatcher 发了个消息,你这里前端点击之后后端 WebMonitor 给 Dispatcher 发
requestMultipleJobDetails
消息
2. Dispatcher 跟 zk 链接抖动,丢 leader 了。早期版本会把这个 fencing token 设置成 null
3. 1 里面的消息到达 Dispatcher,Dispatcher 走 fencing token 逻辑,看到是 null
4. 抛出此异常

如果稍后又选举成功,这里的异常应该是 fencing token mismatch 一类的

Best,
tison.


tison  于2020年6月9日周二 下午9:15写道:

> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
>> 大家好:
>> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>>
>>
>> 异常信息:
>> Internal server error.,
>> > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>>  Fencing token not set: Ignoring message
>> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
>> LocalRpcInvocation(requestMultipleJobDetails(Time)))
>> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
>> the fencing token is null.
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> End of exception on server side>
>
>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 tison
啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的

Best,
tison.


whirly  于2020年6月9日周二 下午8:58写道:

> 大家好:
> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>
>
> 异常信息:
> Internal server error.,
>  side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>  Fencing token not set: Ignoring message
> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> LocalRpcInvocation(requestMultipleJobDetails(Time)))
> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because the
> fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>


flink JobManager HA 异常 the fencing token is null

2020-06-09 文章 whirly
大家好:
环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?


异常信息:
Internal server error.,


Re: Flink新出的OrcBulkWriterFactory有没有大佬给个详细的Demo

2020-06-09 文章 Yun Gao
Hello,

例子的话可以参考一下Flink文档 [1] 和相关的测试case [2] 。

   现在跑的作业有开checkpoint么?另外,这个作业是一个无限数据流的作业还是一个有限数据流的作业?



 [1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
 找Orc Format一节,这个doc页现在格式坏了,社区在修
 [2]  
https://github.com/apache/flink/blob/master/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java


--
Sender:夏帅
Date:2020/06/08 17:28:08
Recipient:user-zh
Theme:Flink新出的OrcBulkWriterFactory有没有大佬给个详细的Demo

自己在使用时,会有文件生成,但是文件内并不包含数据


Re: 请问 StreamingFileSink如何写数据到其它HA的Hadoop集群,并且是yarn job

2020-06-09 文章 Yun Gao
Hello,

 就是现在有没有遇到具体的错误?我理解应该需要在flink 
TM所运行机器上的HADOOP_CONF_DIR底下的hdfs.site配置一些参数。可能可以参考之前的提问:[1]


   [1] 
http://apache-flink.147419.n8.nabble.com/How-to-write-stream-data-to-other-Hadoop-Cluster-by-StreamingFileSink-td792.html


--
Sender:苯鵝飛啊飛
Date:2020/06/09 20:11:58
Recipient:user-zh
Theme:请问 StreamingFileSink如何写数据到其它HA的Hadoop集群,并且是yarn job

你好,如题:
请问 StreamingFileSink如何写数据到其它HA的Hadoop集群,并且是yarn job


感谢!

莫等闲 白了少年头
=
Mobile:18611696624
QQ:79434564





??????flink????????????????

2020-06-09 文章 1048262223
Hi


rich functionopenbroadcast??


Best,
Yichao Yang




----
??:"zjfpla...@hotmail.com"

flink如何传递全局变量

2020-06-09 文章 zjfpla...@hotmail.com
hi,
请问flink如何传递全局变量,静态类好像服务器端运行不行。
场景是:一开始flink程序起来时,读取配置文件中的配置项,此类配置项可能会在sink,source等等其他地方用到,算是整个程序的全局配置



zjfpla...@hotmail.com


???? StreamingFileSink????????????????HA??Hadoop????,??????yarn job

2020-06-09 文章 ???Z?w???w

 StreamingFileSinkHA??Hadoop,??yarn job


??

?? ??
=
Mobile??18611696624
QQ:79434564




????: ?????? ????flinksql between????

2020-06-09 文章 wangweigu...@stevegame.cn

  1.10 useBlinkPlanneruseOldPlanner
  
??
  Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalProject(num=[$0])
  LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner])
FlinkLogicalDataStreamScan(id=[1], fields=[num])
FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])

This exception indicates that the query uses an unsupported SQL feature.




 
 ??
?? 2020-06-09 17:41
 user-zh
?? ?? flinksql between
hi
1 flink1.9.0
2 oldplanner


??????Flink State ?????????? state ????????????

2020-06-09 文章 1048262223
Hi


[1]


[1]https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/schema_evolution.html#pojo-%E7%B1%BB%E5%9E%8B


Best,
Yichao Yang




----
??:"wangl...@geekplus.com.cn"

?????? ??????Flink State ?????????? state ????????????

2020-06-09 文章 1048262223
Hi


TypeInformation a = 
TypeInformation.of(OrderState.class);??pojopojo??


Best,
Yichao Yang





----
??:"wangl...@geekplus.com.cn"

??????????????????

2020-06-09 文章 1048262223
Hi


??jobmysql??

??jobmysql??job





Best,
Yichao Yang




----
??:"wangxiangyan"

????: ??????Flink State ?????????? state ????????????

2020-06-09 文章 wangl...@geekplus.com.cn

OrderState 
 get set ?? 

 flink ??OrderState??



wang...@geekplus.com.cn

 1048262223
?? 2020-06-09 18:11
 user-zh
?? ??Flink State ?? state 
Hi
 
 
flink??OrderStatepojo??savepoint??
 
Best,
Yichao Yang
 
 
 
 
----
??:"wangl...@geekplus.com.cn"

??????Flink State ?????????? state ????????????

2020-06-09 文章 1048262223
Hi


flink??OrderStatepojo??savepoint??

Best,
Yichao Yang




----
??:"wangl...@geekplus.com.cn"

Flink State 增加字段后 state 还能识别吗?

2020-06-09 文章 wangl...@geekplus.com.cn

写了个简单的类会在 Flink State 中使用:

public class OrderState {
private Integer warehouseId;
private String orderNo;
private String ownerCode;
private Long inputDate;
private int orderType;
private int amount = 0;
private int status = 0;
.
}


现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗?
也就是 flink run -s   savepointdir   后能正常识别旧的代码保存的 state 吗?

谢谢,
王磊



wangl...@geekplus.com.cn


Re:延迟事件处理

2020-06-09 文章 wangxiangyan
1. 
指标统计展示是mysql,按照事件时间做窗口的统计,如果按照处理时间,需要找到数据所属的窗口实现外部系统的更新,但在前台页面可能获取不到最近几分钟的统计数据,此时事件时间也同样延迟,语义上不如事件事件解释性强一些


2. 白天运行的时候显示实时的数据,晚上去更正一整天的数据,资源消耗很大,不确定这种延迟的频率,应该也不会常出现吧


还是将延迟数据收集起来,另外启动一个流处理任务不断消费延迟数据,通过和mysql交互,将统计出的指标和之前窗口统计出的指标求和然后更新,这个逻辑似乎比较合理
--Original--
From: "1048262223"<1048262...@qq.com;
Date: Tue, Jun 9, 2020 05:40 PM
To: "user-zh"

?????? ????flinksql between????

2020-06-09 文章 ??????
hi
1 flink1.9.0
2 oldplanner


??????????????????

2020-06-09 文章 1048262223
Hi


??
1.olap??olap
2. t - 1 
??


Best,
Yichao Yang




----
??:"wangxiangyan"

Re: pyflink数据查询

2020-06-09 文章 Jeff Zhang
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475



jack  于2020年6月9日周二 下午5:28写道:

> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>


-- 
Best Regards

Jeff Zhang


Re: Re:延迟事件处理

2020-06-09 文章 wangweigu...@stevegame.cn

觉得对于下线产生的历史数据,用批处理应该更好一点,可以避免数据量过大造成的问题!


 
发件人: wangxiangyan
发送时间: 2020-06-09 17:26
收件人: user-zh
主题: Re:延迟事件处理
这是一个需要实时展示统计指标的系统,数据来源于检测器,检测器安装在客户那边,可能有下线的状态,或者数据延迟到达,不确定下线的时间,某个检测器下线之后在第二天上线会有一批昨天的数据,会发生延迟的数据处理


--Original--
From: "1048262223"<1048262...@qq.com;
Date: Tue, Jun 9, 2020 05:14 PM
To: "user-zh"

??????????????????????????????????

2020-06-09 文章 kcz
sorry??




----
??:"1048262223"<1048262...@qq.com;
:2020??6??9??(??) 5:07
??:"user-zh"

pyflink数据查询

2020-06-09 文章 jack
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。


flink能否实现这样的方式?
感谢

Re:延迟事件处理

2020-06-09 文章 wangxiangyan
这是一个需要实时展示统计指标的系统,数据来源于检测器,检测器安装在客户那边,可能有下线的状态,或者数据延迟到达,不确定下线的时间,某个检测器下线之后在第二天上线会有一批昨天的数据,会发生延迟的数据处理


--Original--
From: "1048262223"<1048262...@qq.com;
Date: Tue, Jun 9, 2020 05:14 PM
To: "user-zh"

Re: Flink SQL UDF 动态类型

2020-06-09 文章 Jingsong Li
Hi all,

业务上一般是可以避免动态类型的UDF的,如果有刚需,1.11已经支持了[1],文档还在路上,一个简单的例子根据第一个参数来推断返回类型:

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
   return TypeInference.newBuilder()
  .outputTypeStrategy(TypeStrategies.argument(0))
  .build();
}


[1]https://issues.apache.org/jira/browse/FLINK-15487

Best,
Jingsong Lee

On Tue, Jun 9, 2020 at 4:57 PM kcz <573693...@qq.com> wrote:

> 有道理呀,我直接map就好 。tks。
>
>
>
>
> --原始邮件--
> 发件人:"1048262223"<1048262...@qq.com;
> 发送时间:2020年6月9日(星期二) 下午4:51
> 收件人:"user-zh"
> 主题:回复: Flink SQL UDF 动态类型
>
>
>
> Hi
>
>
> 这个可以通过返回一个通用类型比如Map来实现。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"kcz"<573693...@qq.comgt;;
> 发送时间:nbsp;2020年6月9日(星期二) 下午4:49
> 收件人:nbsp;"user-zh"
> 主题:nbsp;回复: Flink SQL UDF 动态类型
>
>
>
> 动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。
>
>
>
>
> --amp;nbsp;原始邮件amp;nbsp;--
> 发件人:amp;nbsp;"Benchao Li" 发送时间:amp;nbsp;2020年6月9日(星期二) 下午2:47
> 收件人:amp;nbsp;"user-zh"
> 主题:amp;nbsp;Re: Flink SQL UDF 动态类型
>
>
>
> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
>
> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
>
> 1048262223 <1048262...@qq.comamp;gt; 于2020年6月9日周二 下午2:23写道:
>
> amp;gt; Hi
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
> amp;gt;
> amp;gt;
> amp;gt; 如果有理解不对之处,敬请指出。
> amp;gt;
> amp;gt;
> amp;gt; Best,
> amp;gt; Yichao Yang
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
> amp;gt; 发件人:amp;amp;nbsp;"forideal" amp;amp;gt;;
> amp;gt; 发送时间:amp;amp;nbsp;2020年6月9日(星期二) 中午1:33
> amp;gt; 收件人:amp;amp;nbsp;"user-zh" amp;amp;gt;;
> amp;gt;
> amp;gt; 主题:amp;amp;nbsp;Flink SQL UDF 动态类型
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt; 你好,我的朋友:
> amp;gt;
> amp;gt;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 我使用的是 Flink 1.10 Blink Planer。
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
> amp;gt;
> amp;gt;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 为什么我想要这个功能:
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
> amp;gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string
> 这样的方式,实现起来又非常多
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 场景2: 我的数据是一个 Json ,问题同上。
> amp;gt; amp;amp;nbsp;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
> amp;gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> amp;gt; @Override
> amp;gt; public void initialize(LogicalType[] sqlTypes, String[]
> paramNames) {
> amp;gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema
> 信息,拿到类型信息,这样就可以动态的设置类型
> amp;gt; }
> amp;gt; amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
> amp;gt; amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp; 这个case
> 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
> amp;gt; 这个类型不进行 cast 是无法直接使用的。
> amp;gt; public class TimestampTest extends ScalarFunction {
> amp;gt;
> amp;gt; public Object eval(long timestamp, String pattern, int num) {
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> Timestamp timestamp1 = new
> amp;gt; Timestamp(timestamp);
> amp;gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> if (num < 4) {
> amp;gt; //返回 STRING 类型
> amp;gt; return String.valueOf(timestamp);
> amp;gt; }
> amp;gt; if (num < 6) {
> amp;gt; //返回 BIGINT
> amp;gt; return timestamp - 100;
> amp;gt; }
> amp;gt; if (num < 8) {
> amp;gt; //返回 DOUBLE
> amp;gt; double ss = 0.9;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> return
> amp;gt; (double) timestamp + ss;
> amp;gt; }
> amp;gt; //返回 STRING
> amp;gt; return sdf.format(timestamp1);
> amp;gt; }
> amp;gt; }



-- 
Best, Jingsong Lee


Re: Re: flink1.10 on yarn 问题

2020-06-09 文章 wangweigu...@stevegame.cn

是的,yarn-session模式,如果没有提交任务到yarn-session的applictionid中,是不会分配任何的slot和内存!


 
发件人: Benchao Li
发送时间: 2020-06-08 18:26
收件人: user-zh
主题: Re: Re: flink1.10 on yarn 问题
yarn session模式应该是lazy的,你不提交任务,它不会真正的去申请container。
 
小屁孩 <932460...@qq.com> 于2020年6月8日周一 下午6:23写道:
 
>
> 这个问题已经解决 是我自己的cdh的namenode没有启动成功,我目前有个疑问
>
> 命令:./../bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d -s 2
> flink用这个命令申请的应该是 2个slot  为什么通过yarn 的ui界面 没有看到slot数
>
>
> 能不能帮我解答一下 谢谢
> -- 原始邮件 --
> *发件人:* "小屁孩"<932460...@qq.com>;
> *发送时间:* 2020年6月8日(星期一) 下午4:06
> *收件人:* "user-zh";
> *主题:* 回复: Re: flink1.10 on yarn 问题
>
> hello
> 命令 ./yarn-session.sh -n 8 -jm 1024 -tm 1024 -s 4 -nm FlinkOnYarnSession -d
> 版本 :flink1.10.0  CDH5.14
> 我在使用flink on yarn的yarn-session 模式时报错如下
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn session cluster
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:548)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
> Caused by: java.net.ConnectException: Call From master/192.168.1.20 to
> slave1:8020 failed on connection exception: java.net.ConnectException:
> Connection refused; For more details see:
> http://wiki.apache.org/hadoop/ConnectionRefused
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731)
> at org.apache.hadoop.ipc.Client.call(Client.java:1474)
> at org.apache.hadoop.ipc.Client.call(Client.java:1401)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy13.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
> at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:496)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:348)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
> at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1907)
> at org.apache.flink.yarn.Utils.uploadLocalFileToRemote(Utils.java:172)
> at org.apache.flink.yarn.Utils.setupLocalResource(Utils.java:126)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.setupSingleLocalResource(YarnClusterDescriptor.java:1062)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.uploadAndRegisterFiles(YarnClusterDescriptor.java:1144)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:707)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:488)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:373)
> ... 7 more
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> at 

延迟事件处理

2020-06-09 文章 wangxiangyan
hi,有个问题请教大家

在使用中遇到的需求是,按分钟处理数据,数据源是不稳定的,可能会一段时间内下线,比如第二天前一天的数据大量涌入,可能的选择方案有
1.延迟数据处理:将延迟数据采取另外的逻辑处理与外部系统交互,但是允许延迟的状态存储是不是需要调节为一天时间
2.每天晚上定时使用批处理重新计算白天的数据去校正

应该使用哪种方式或者使用更好的方式去处理呢?

??????????????????

2020-06-09 文章 1048262223
Hi



??flink


Best,
Yichao Yang




----
??:"wangxiangyan"

延迟事件处理

2020-06-09 文章 wangxiangyan
hi
在使用中遇到的需求是,按分钟处理数据,数据源是不稳定的,可能会一段时间内下线,比如第二天前一天的数据大量涌入,可能的选择方案有
1.延迟数据处理:将延迟数据采取另外的逻辑处理与外部系统交互,但是允许延迟的状态存储是不是需要调节为一天时间
2.每天晚上定时使用批处理重新计算白天的数据去校正

应该使用哪种方式或者使用更好的方式去处理呢?

flink延迟数据处理

2020-06-09 文章 wangxiangyan
hi
在使用中遇到的需求是,按分钟处理数据,数据源是不稳定的,可能会一段时间内下线,比如第二天前一天的数据大量涌入,可能的选择方案有
1.延迟数据处理:将延迟数据采取另外的逻辑处理与外部系统交互,但是允许延迟的状态存储是不是需要调节为一天时间
2.每天晚上定时使用批处理重新计算白天的数据去校正

应该使用哪种方式或者使用更好的方式去处理呢?

??????????????????????????????????

2020-06-09 文章 1048262223
Hi


??


Best
Yichao Yang




----
??:"kcz"<573693...@qq.com;
:2020??6??9??(??) 5:06
??:"user-zh"

????????????????????????????

2020-06-09 文章 kcz
join??open
??

?????? Flink SQL UDF ????????

2020-06-09 文章 kcz
map ??tks??




----
??:"1048262223"<1048262...@qq.com;
:2020??6??9??(??) 4:51
??:"user-zh"

?????? Flink SQL UDF ????????

2020-06-09 文章 1048262223
Hi


Map


Best,
Yichao Yang




----
??:"kcz"<573693...@qq.com;
:2020??6??9??(??) 4:49
??:"user-zh"

?????? Flink SQL UDF ????????

2020-06-09 文章 kcz
udfudf




----
??:"Benchao Li"

Re: 关于flinksql between问题

2020-06-09 文章 Benchao Li
方便补充一下以下信息么?
1. 你使用的Flink的版本?
2. 使用的planner,是blink planner还是old planner?
3. 用的是streaming mode还是batch mode?
4. 具体的报错信息是什么?

小屁孩 <932460...@qq.com> 于2020年6月9日周二 下午4:26写道:

> hi,我在flinksql中使用 select * from a join b on a.ip  b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用


????flinksql between????

2020-06-09 文章 ??????
hi??flinksql?? select * from a join b on a.ip 

Re:Re: Flink SQL UDF 动态类型

2020-06-09 文章 forideal
+1 to support pb format



如果能支持 pb format 那简直太好了,实际了我们也自己搞了一个 pb 
format。大概的方法也是在外边做了一个对应的service,这个service保存了一个jar,在进行parse byte 
的时候,采用了urlclassload+反射调用 parse 类型的方法。
同时也尝试过使用 dynamic message 的方式,这个方式更轻量一些,但是,性能差强人意。











在 2020-06-09 14:49:02,"Jark Wu"  写道:
>+1 to support pb format.
>
>On Tue, 9 Jun 2020 at 14:47, Benchao Li  wrote:
>
>> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
>> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
>> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
>>
>> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
>> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
>>
>> 1048262223 <1048262...@qq.com> 于2020年6月9日周二 下午2:23写道:
>>
>> > Hi
>> >
>> >
>> >
>> >
>> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
>> >
>> >
>> >
>> >
>> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
>> >
>> >
>> > 如果有理解不对之处,敬请指出。
>> >
>> >
>> > Best,
>> > Yichao Yang
>> >
>> >
>> >
>> >
>> > --原始邮件--
>> > 发件人:"forideal"> > 发送时间:2020年6月9日(星期二) 中午1:33
>> > 收件人:"user-zh"> >
>> > 主题:Flink SQL UDF 动态类型
>> >
>> >
>> >
>> > 你好,我的朋友:
>> >
>> >
>> >  我使用的是 Flink 1.10 Blink Planer。
>> >  我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
>> >
>> >
>> >  为什么我想要这个功能:
>> >  场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
>> > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
>> >  场景2: 我的数据是一个 Json ,问题同上。
>> > 
>> >  在场景1中,我改了下 Flink 的源码,在 ScalarFunction
>> > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
>> > @Override
>> > public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
>> > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
>> > }
>> >  这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
>> >  这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
>> > 这个类型不进行 cast 是无法直接使用的。
>> > public class TimestampTest extends ScalarFunction {
>> >
>> > public Object eval(long timestamp, String pattern, int num) {
>> >  Timestamp timestamp1 = new
>> > Timestamp(timestamp);
>> > SimpleDateFormat sdf = new SimpleDateFormat(pattern);
>> >  if (num < 4) {
>> > //返回 STRING 类型
>> > return String.valueOf(timestamp);
>> > }
>> > if (num < 6) {
>> > //返回 BIGINT
>> > return timestamp - 100;
>> > }
>> > if (num < 8) {
>> > //返回 DOUBLE
>> > double ss = 0.9;
>> >  return
>> > (double) timestamp + ss;
>> > }
>> > //返回 STRING
>> > return sdf.format(timestamp1);
>> > }
>> > }
>>


Re: Flink SQL UDF 动态类型

2020-06-09 文章 Benchao Li
FYI: issue[1] 已经建好了,各位感兴趣的可以关注一下,也非常欢迎参与设计和实现~

[1] https://issues.apache.org/jira/browse/FLINK-18202

1048262223 <1048262...@qq.com> 于2020年6月9日周二 下午2:54写道:

> Hi
>
>
>
> +1,各位大佬,其实我自己已经通过参考avro,json等format实现基于pb实现了一个flink-protobuf解析的formats,git地址如下
> https://github.com/yangyichao-mango/flink-protobuf
> 之后我会持续关注社区关于pb format的实现。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年6月9日(星期二) 下午2:49
> 收件人:"user-zh"
> 主题:Re: Flink SQL UDF 动态类型
>
>
>
> +1 to support pb format.
>
> On Tue, 9 Jun 2020 at 14:47, Benchao Li 
>  我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
>  我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
>  这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
> 
>  之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
>  我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
> 
>  1048262223 <1048262...@qq.com 于2020年6月9日周二 下午2:23写道:
> 
>   Hi
>  
>  
>  
>  
> 
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
>  
>  
>  
>  
> 
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
>  
>  
>   如果有理解不对之处,敬请指出。
>  
>  
>   Best,
>   Yichao Yang
>  
>  
>  
>  
>   --nbsp;原始邮件nbsp;--
>   发件人:nbsp;"forideal"   发送时间:nbsp;2020年6月9日(星期二) 中午1:33
>   收件人:nbsp;"user-zh"  
>   主题:nbsp;Flink SQL UDF 动态类型
>  
>  
>  
>   你好,我的朋友:
>  
>  
>   nbsp;nbsp;nbsp;nbsp;nbsp; 我使用的是 Flink
> 1.10 Blink Planer。
>   nbsp;nbsp;nbsp;nbsp;nbsp; 我想构造一个Flink
> UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
>  
>  
>   nbsp;nbsp;nbsp;nbsp;nbsp; 为什么我想要这个功能:
>   nbsp;nbsp;nbsp;nbsp;nbsp; 场景1: 我的数据是一个
> pb 的 bytes,我想从里面获取数据,如果统一的返回
>   string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string
> 这样的方式,实现起来又非常多
>   nbsp;nbsp;nbsp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。
>   nbsp;
>   nbsp;nbsp;nbsp;nbsp; 在场景1中,我改了下 Flink 的源码,在
> ScalarFunction
>   中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
>   @Override
>   public void initialize(LogicalType[] sqlTypes, String[]
> paramNames) {
>   // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema
> 信息,拿到类型信息,这样就可以动态的设置类型
>   }
>   nbsp;nbsp;nbsp; 这个方法很有效果,他帮我们 workaround
> 了一段时间,目前依然work。只是有些不是那么优雅。
>   nbsp;nbsp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回
> RAW('java.lang.Object', ?)
>   这个类型不进行 cast 是无法直接使用的。
>   public class TimestampTest extends ScalarFunction {
>  
>   public Object eval(long timestamp, String pattern, int num) {
>  
> nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> Timestamp timestamp1 = new
>   Timestamp(timestamp);
>   SimpleDateFormat sdf = new SimpleDateFormat(pattern);
>  
> nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; if
> (num < 4) {
>   //返回 STRING 类型
>   return String.valueOf(timestamp);
>   }
>   if (num < 6) {
>   //返回 BIGINT
>   return timestamp - 100;
>   }
>   if (num < 8) {
>   //返回 DOUBLE
>   double ss = 0.9;
>  
> nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> return
>   (double) timestamp + ss;
>   }
>   //返回 STRING
>   return sdf.format(timestamp1);
>   }
>   }
> 


?????? Flink SQL UDF ????????

2020-06-09 文章 1048262223
Hi


+1??avro??json??formatpb??flink-protobuf??formats??git
https://github.com/yangyichao-mango/flink-protobuf
pb format


Best,
Yichao Yang




----
??:"Jark Wu"

Re: Flink SQL UDF 动态类型

2020-06-09 文章 Jark Wu
+1 to support pb format.

On Tue, 9 Jun 2020 at 14:47, Benchao Li  wrote:

> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
>
> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
>
> 1048262223 <1048262...@qq.com> 于2020年6月9日周二 下午2:23写道:
>
> > Hi
> >
> >
> >
> >
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
> >
> >
> >
> >
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
> >
> >
> > 如果有理解不对之处,敬请指出。
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"forideal" > 发送时间:2020年6月9日(星期二) 中午1:33
> > 收件人:"user-zh" >
> > 主题:Flink SQL UDF 动态类型
> >
> >
> >
> > 你好,我的朋友:
> >
> >
> >  我使用的是 Flink 1.10 Blink Planer。
> >  我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
> >
> >
> >  为什么我想要这个功能:
> >  场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
> > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
> >  场景2: 我的数据是一个 Json ,问题同上。
> > 
> >  在场景1中,我改了下 Flink 的源码,在 ScalarFunction
> > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> > @Override
> > public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
> > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
> > }
> >  这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
> >  这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
> > 这个类型不进行 cast 是无法直接使用的。
> > public class TimestampTest extends ScalarFunction {
> >
> > public Object eval(long timestamp, String pattern, int num) {
> >  Timestamp timestamp1 = new
> > Timestamp(timestamp);
> > SimpleDateFormat sdf = new SimpleDateFormat(pattern);
> >  if (num < 4) {
> > //返回 STRING 类型
> > return String.valueOf(timestamp);
> > }
> > if (num < 6) {
> > //返回 BIGINT
> > return timestamp - 100;
> > }
> > if (num < 8) {
> > //返回 DOUBLE
> > double ss = 0.9;
> >  return
> > (double) timestamp + ss;
> > }
> > //返回 STRING
> > return sdf.format(timestamp1);
> > }
> > }
>


Re: Flink SQL UDF 动态类型

2020-06-09 文章 Benchao Li
我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。

之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。

1048262223 <1048262...@qq.com> 于2020年6月9日周二 下午2:23写道:

> Hi
>
>
>
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
>
>
>
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
>
>
> 如果有理解不对之处,敬请指出。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"forideal" 发送时间:2020年6月9日(星期二) 中午1:33
> 收件人:"user-zh"
> 主题:Flink SQL UDF 动态类型
>
>
>
> 你好,我的朋友:
>
>
>  我使用的是 Flink 1.10 Blink Planer。
>  我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
>
>
>  为什么我想要这个功能:
>  场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
> string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
>  场景2: 我的数据是一个 Json ,问题同上。
> 
>  在场景1中,我改了下 Flink 的源码,在 ScalarFunction
> 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> @Override
> public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
> // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
> }
>  这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
>  这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
> 这个类型不进行 cast 是无法直接使用的。
> public class TimestampTest extends ScalarFunction {
>
> public Object eval(long timestamp, String pattern, int num) {
>  Timestamp timestamp1 = new
> Timestamp(timestamp);
> SimpleDateFormat sdf = new SimpleDateFormat(pattern);
>  if (num < 4) {
> //返回 STRING 类型
> return String.valueOf(timestamp);
> }
> if (num < 6) {
> //返回 BIGINT
> return timestamp - 100;
> }
> if (num < 8) {
> //返回 DOUBLE
> double ss = 0.9;
>  return
> (double) timestamp + ss;
> }
> //返回 STRING
> return sdf.format(timestamp1);
> }
> }


??????Flink SQL UDF ????????

2020-06-09 文章 1048262223
Hi


pbpb??schema(descriptor)??TypeInformationenv.addSource().returns()??TypeInformation??TypeInformation??


??udfudfudf??udfudf


??


Best,
Yichao Yang




----
??:"forideal"

??????Flink??????????????????

2020-06-09 文章 1048262223
Hi


1.try catch??
2.??
3.??try catch + ??perf log + 


Best,
Yichao Yang




----
??:"Z-Z"