Re:Re: flink sql 消费kafka失败

2020-06-09 Thread Zhou Zach
感谢回复,写入Kafka的时间戳改成"2020-06-10T12:12:43Z",消费成功了

















在 2020-06-10 13:25:01,"Leonard Xu"  写道:
>Hi,
>
>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>
>报错信息说了是 json 解析失败了,按照之前大家踩的坑,请检查下两点:
>(1)json 中timestamp数据的格式必须是"2020-06-10T12:12:43Z", 不能是 long 
>型的毫秒,社区已有issue跟进,还未解决
>(2)kafka 对应topic 检查下是否有脏数据,“earliest-offset’” 会从topic的第一条数据开始消费
>
>祝好
>Leonard Xu


Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 Thread Zhefu PENG
Hi Yichao,

感谢你的回复。因为这个任务已经上线大概一周了,今天才报出这个问题,我们后面会增大间隔并测试。同时,我在刚刚也有回复,我在TM也查到了一些相关日志:
2020-06-10 12:44:40,688 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Pending record count must be zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be
zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more

是不是还是和checkpoint的设立间隔过短有关呢?希望回复,感谢!

Best,
Zhefu

Yichao Yang <1048262...@qq.com> 于2020年6月10日周三 下午1:24写道:

> Hi
>
>
>
> 看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。
>
>
> Best,
> Yichao Yang
>
>
> -- 原始邮件 --
> 发件人: Zhefu PENG  发送时间: 2020年6月10日 13:04
> 收件人: user-zh  主题: 回复:flink任务checkpoint无法完成snapshot,且报kafka异常
>
>
>
> Hi all,
>
> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> Source: Custom Source - Map - Source_Map - Empty_Filer -
> Field_Filter
> - Type_Filter - Value_Filter - Map - Map - Map -
> Sink: Unnamed
>
>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>
> 部分报错信息如下:
> 2020-06-10 12:02:49,083 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering
> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> 2020-06-10 12:04:47,898 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Decline
> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> c41f4811262db1c4c270b136571c8201 at
> container_e27_1591466310139_21670_01_06 @
> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> 2020-06-10 12:04:47,899 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Discarding
> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
> snapshot 1 for operator Source: Custom Source - Map - Source_Map
> -
> Empty_Filer - Field_Filter - Type_Filter - Value_Filter -
> Map - Map -
> Map - Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
>
> 

Re: flink sql 消费kafka失败

2020-06-09 Thread Leonard Xu
Hi,

> Caused by: java.io.IOException: Failed to deserialize JSON object.

报错信息说了是 json 解析失败了,按照之前大家踩的坑,请检查下两点:
(1)json 中timestamp数据的格式必须是"2020-06-10T12:12:43Z", 不能是 long 型的毫秒,社区已有issue跟进,还未解决
(2)kafka 对应topic 检查下是否有脏数据,“earliest-offset’” 会从topic的第一条数据开始消费

祝好
Leonard Xu

回复:flink sql 消费kafka失败

2020-06-09 Thread Yichao Yang
Hi


看报错应该是kafKa有脏数据。


Best,
Yichao Yang



发自我的iPhone


-- 原始邮件 --
发件人: Zhou Zach 

回复:flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 Thread Yichao Yang
Hi


看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。


Best,
Yichao Yang


-- 原始邮件 --
发件人: Zhefu PENG 

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 Thread Zhefu PENG
补充一下,在TaskManager发现了如下错误日志:

2020-06-10 12:44:40,688 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Pending record count must be zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be
zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more

希望得到帮助,感谢!


Zhefu PENG  于2020年6月10日周三 下午1:03写道:

> Hi all,
>
> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
>
>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>
> 部分报错信息如下:
> 2020-06-10 12:02:49,083 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> 2020-06-10 12:04:47,898 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> c41f4811262db1c4c270b136571c8201 at
> container_e27_1591466310139_21670_01_06 @
> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> 2020-06-10 12:04:47,899 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Map -> Source_Map
> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map
> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: The server disconnected before a response was 

Re: 关于flinksql between问题

2020-06-09 Thread Leonard Xu
Hi,

看你描述的想要的是自定义source(左表),  需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular 
join, 维表join的语法[1]:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency
另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 
参数控制维表中cache的过期时间,不知道是否满足你的需求。

Best,
Leonard Xu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 


> 在 2020年6月10日,10:43,小屁孩 <932460...@qq.com> 写道:
> 
> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
> 
> 
> 
> 
> --原始邮件--
> 发件人:"wangweigu...@stevegame.cn" 发送时间:2020年6月9日(星期二) 晚上6:35
> 收件人:"user-zh" 
> 主题:回复: 回复: 关于flinksql between问题
> 
> 
> 
> 
>  我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
>  
> 会报你下面的错误:
>  Exception in thread "main" org.apache.flink.table.api.TableException: 
> Cannot generate a valid execution plan for the given query: 
> 
> LogicalProject(num=[$0])
>  LogicalJoin(condition=[AND(($0, $1), <($0, $2))], joinType=[inner])
>  FlinkLogicalDataStreamScan(id=[1], fields=[num])
>  FlinkLogicalDataStreamScan(id=[2], fields=[startNum, 
> endNum])
> 
> This exception indicates that the query uses an unsupported SQL feature.
> 
> 
> 
> 
> 
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是nbsp;
> 1 flink1.9.0
> 2 oldplanner
>  
> 
>  
> 3 streaming mode
> 4. 代码类似如下
> nbsp; nbsp; val sqlStream = env.createInput(jdbcInput)
> nbsp; nbsp; 
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> nbsp; nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //nbsp; nbsp; val table = tnv.sqlQuery("select * fromnbsp; 
> OMstream asnbsp; a left join sqlStream asnbsp; b on a.ip 
> gt;b.start_ip and a.ip nbsp; nbsp; val table = tnv.sqlQuery("select b.netstruct_id 
> fromnbsp; OMstream asnbsp; a left join sqlStream as b on a.ip 
> gt; b.start_ip and a.ip  nbsp; nbsp; val resRow = table.toRetractStream[Row]
> 
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
> generate a valid execution plan for the given query:nbsp;
> 
> 
> LogicalProject(netstruct_id=[$1])
> nbsp; LogicalJoin(condition=[AND(gt;($0, $2), <($0, $3))], 
> joinType=[left])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[2], 
> fields=[netstruct_id, start_ip, end_ip])
> 
> 
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> at 
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at 
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at 
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at 
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at 
> org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at 
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> 
> 
> 
> 
> 
> 6 我也尝试使用了nbsp;
> select b.netstruct_id fromnbsp; OMstream asnbsp; a left join 
> sqlStream as b on a.ip gt; b.start_ip
> 

flink sql 消费kafka失败

2020-06-09 Thread Zhou Zach




Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
994bd5a683143be23a23d77ed005d20d)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at org.rabbit.sql.FromKafkaSinkMysql$.main(FromKafkaSinkMysql.scala:66)
at org.rabbit.sql.FromKafkaSinkMysql.main(FromKafkaSinkMysql.scala)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 994bd5a683143be23a23d77ed005d20d)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 31 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown 

flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 Thread Zhefu PENG
Hi all,

现在有一个简单的flink任务,大概chain在一起后的执行图为:
Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
-> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed

但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。

部分报错信息如下:
2020-06-10 12:02:49,083 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
2020-06-10 12:04:47,898 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
c41f4811262db1c4c270b136571c8201 at
container_e27_1591466310139_21670_01_06 @
hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
2020-06-10 12:04:47,899 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 1 for operator Source: Custom Source -> Map -> Source_Map ->
Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map ->
Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: The server disconnected before a response was
received.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:973)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:978)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotStat(AbstractStreamOperator.java:402)
... 18 more
Caused by: org.apache.kafka.common.errors.NetworkException: The server
disconnected before a response was received.
2020-06-10 12:04:47,913 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at

Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-06-09 Thread Tzu-Li (Gordon) Tai
Hi all,

Just to give a quick update: there will be contributors from the AWS
Kinesis team working on contributing enhanced fan out support to the
connector.
You can follow the progress here:
https://issues.apache.org/jira/browse/FLINK-17688

Cheers,
Gordon

On Fri, May 15, 2020 at 5:55 PM orionemail 
wrote:

> Hi,
>
> We also recently needed this functionality, unfortunately we were unable
> to implement it ourselves so changed our plan accordingly.
>
> However we very much see the benefit for this feature and would be
> interested in following the JIRA ticket.
>
> Thanks
>
>
>
> ‐‐‐ Original Message ‐‐‐
> On Thursday, 14 May 2020 11:34, Xiaolong Wang 
> wrote:
>
> Thanks, I'll check it out.
>
> On Thu, May 14, 2020 at 6:26 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Xiaolong,
>>
>> You are right, the way the Kinesis connector is implemented / the way the
>> AWS APIs are used, does not allow it to consume Kinesis streams with
>> enhanced fan-out enabled consumers [1].
>> Could you open a JIRA ticket for this?
>> As far as I can tell, this could be a valuable contribution to the
>> connector for Kinesis users who require dedicated throughput isolated from
>> other running consumers.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
>>
>> On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang <
>> xiaolong.w...@smartnews.com> wrote:
>>
>>> Hello Flink Community!
>>>
>>>   I'm currently coding on a project relying on AWS Kinesis. With the
>>> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
>>> message.
>>>
>>>  But as the main stream is used among several other teams, I was
>>> required to use the enhance fanout of Kinesis. I checked the connector code
>>> and found no implementations.
>>>
>>>  Has this issue occurred to anyone before?
>>>
>>> Thanks for your help.
>>>
>>
>


?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 Thread kcz
tks




----
??:"Yichao Yang"<1048262...@qq.com;
:2020??6??10??(??) 11:32
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
amp;gt;
amp;gt; 
amp;gt;
amp;gt; ?S 

?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 Thread Yichao Yang
Hi


Flink sql 
??uidsql??datastream 
api


Best,
Yichao Yang




----
??:"kcz"<573693...@qq.com;
:2020??6??10??(??) 11:27
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
gt;
gt; 
gt;
gt; ?S 

?????? ????FlinkSQL????operatoer??????savepoint??????????????

2020-06-09 Thread kcz
sql 
operatorID??ID




----
??:"??"https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

 

 ?S 

Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 Thread 方盛凯
我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯  于2020年6月9日周二 下午9:26写道:

>
> 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
> 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
>
> 如有错误,欢迎补充回答。
>
> 陈赋赟  于2020年6月8日周一 上午11:53写道:
>
>> 原先sql任务是:
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>>
>>
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> CREATE TABLE C_source(...)
>> CREATE TABLE D_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>>
>>
>> INSERT INTO C_sink
>> SELECT
>>  1
>> FROM
>> D_source
>> ;
>> 并基于Savepoint提交,结果显示
>>
>> Cannot map checkpoint/savepoint state for operator
>> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
>> is not available in the new program.
>> If you want to allow to skip this, you can set the
>> --allowNonRestoredState option on the CLI.
>>
>>
>> 想请教一下底层是因为什么原因导致了opertor匹配不上?
>
>


Re: 回复: 关于flinksql between问题

2020-06-09 Thread Benchao Li
你的意思是你的mysql维表是自定义的,然后是定期更新的维表内容是么?只要你实现的是LookupSource,应该是没问题的。
内部实现你可以自己控制。

小屁孩 <932460...@qq.com> 于2020年6月10日周三 上午10:46写道:

> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表
> 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>
>
>
>
> --原始邮件--
> 发件人:"wangweigu...@stevegame.cn" 发送时间:2020年6月9日(星期二) 晚上6:35
> 收件人:"user-zh"
> 主题:回复: 回复: 关于flinksql between问题
>
>
>
>
>  我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
> 
> 会报你下面的错误:
>  Exception in thread "main"
> org.apache.flink.table.api.TableException: Cannot generate a valid
> execution plan for the given query:
>
> LogicalProject(num=[$0])
>  LogicalJoin(condition=[AND(($0, $1), <($0, $2))],
> joinType=[inner])
>  FlinkLogicalDataStreamScan(id=[1], fields=[num])
>  FlinkLogicalDataStreamScan(id=[2], fields=[startNum,
> endNum])
>
> This exception indicates that the query uses an unsupported SQL feature.
>
>
>
>
> 
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是nbsp;
> 1 flink1.9.0
> 2 oldplanner
>  
> 
>  
> 3 streaming mode
> 4. 代码类似如下
> nbsp; nbsp; val sqlStream = env.createInput(jdbcInput)
> nbsp; nbsp;
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> nbsp; nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //nbsp; nbsp; val table = tnv.sqlQuery("select * fromnbsp;
> OMstream asnbsp; a left join sqlStream asnbsp; b on a.ip
> gt;b.start_ip and a.ip nbsp; nbsp; val table = tnv.sqlQuery("select b.netstruct_id
> fromnbsp; OMstream asnbsp; a left join sqlStream as b on a.ip
> gt; b.start_ip and a.ip  nbsp; nbsp; val resRow = table.toRetractStream[Row]
> 
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:nbsp;
> 
> 
> LogicalProject(netstruct_id=[$1])
> nbsp; LogicalJoin(condition=[AND(gt;($0, $2), <($0, $3))],
> joinType=[left])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[2],
> fields=[netstruct_id, start_ip, end_ip])
> 
> 
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at org.apache.flink.table.planner.StreamPlanner.org
> $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> 
> 
> 
> 
> 
> 6 我也尝试使用了nbsp;
> select b.netstruct_id fromnbsp; OMstream asnbsp; a left join
> sqlStream as b on a.ip gt; b.start_ip
> 同样是单个大小比较也是不可以的nbsp;
> 
> 
> 谢谢!
> 
> 
> 
> 
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"Benchao Li" 发送时间:nbsp;2020年6月9日(星期二) 下午4:37
> 收件人:nbsp;"user-zh" 
> 主题:nbsp;Re: 关于flinksql between问题
> 
> 
> 
> 方便补充一下以下信息么?
> 1. 你使用的Flink的版本?
> 2. 使用的planner,是blink planner还是old planner?
> 3. 用的是streaming mode还是batch mode?
> 4. 具体的报错信息是什么?
> 
> 小屁孩 <932460...@qq.comgt; 于2020年6月9日周二 下午4:26写道:
> 
> gt; hi,我在flinksql中使用 select * from a join b on a.ip  a.ip
> gt; amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用


?????? ?????? ????flinksql between????

2020-06-09 Thread ??????
hi, ?? ??joinmysql 
sourcemysql




----
??:"wangweigu...@stevegame.cn"

Re: 流groupby

2020-06-09 Thread Benchao Li
不用窗口的group by,一般推荐设置上state retention时间[1]。默认是不会做状态清理的,所以时间长了状态就会特别多。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

李奇 <359502...@qq.com> 于2020年6月10日周三 上午8:40写道:

> 可以不设置窗口,直接用你的字段,我们就有很多没有窗口的业务场景,但是这样会比较耗内存。如果时间太长,也可能导致oom
>
> > 在 2020年6月9日,下午12:24,allanqinjy  写道:
> >
> > hi,
> >   也就是指定 update-model retract就可以了是吧?好的多谢,我试试!
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >> 在 2020-06-09 12:13:10,"1048262223" <1048262...@qq.com> 写道:
> >> Hi
> >>
> >>
> >> 可以不开窗口只不过结果是retract流而不是append流
> >>
> >>
> >> Best,
> >> Yichao Yang
> >>
> >>
> >>
> >>
> >>
> >> 发自我的iPhone
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: allanqinjy  >> 发送时间: 2020年6月9日 12:11
> >> 收件人: user-zh  >> 主题: 回复:流groupby
>
>


Re: Dynamic rescaling in Flink

2020-06-09 Thread Xintong Song
Hi Prasanna,

Flink does not support dynamic rescaling at the moment.

AFAIK, there are some companies in China already have solutions for dynamic
scaling Flink jobs (Alibaba, 360, etc.), but none of them are yet available
to the community version. These solutions rely on an external system to
monitor the workload and rescale the job accordingly. In case of rescaling,
it requires a full stop of the data processing, then rescale, then recover
from the most recent checkpoint.

The Flink community is also preparing a declarative resource management
approach, which should allow the job to dynamically adapt to the available
resources (e.g., add/reduce pods on kubernetes). AFAIK, this is still in
the design discussion.

Thank you~

Xintong Song



On Wed, Jun 10, 2020 at 2:44 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi all,
>
> Does flink support dynamic scaling. Say try to add/reduce nodes based upon
> incoming load.
>
> Because our use case is such that we get peak loads for 4 hours and then
> medium loads for 8 hours and then light to no load for rest 2 hours.
>
> Or peak load would be atleast 5 times the medium load.
>
> Has anyone used flink in these type of scenario? We are looking at flink
> for it's low latency performance.
>
> Earlier I worked with Spark+YARN which provides a features to dynamicaly
> add/reduce executors.
>
> Wanted to know the same on flink.
>
> Thanks,
> Prasanna
>


Re: flink如何传递全局变量

2020-06-09 Thread Px New
对
正如 -> 1048262223  所说的一样 , 目前我就是通过BroadCast 动态更细一些规则带到下游并在Process method 中
进行操作 | 

zjfpla...@hotmail.com  于2020年6月9日周二 下午8:14写道:

> hi,
> 请问flink如何传递全局变量,静态类好像服务器端运行不行。
> 场景是:一开始flink程序起来时,读取配置文件中的配置项,此类配置项可能会在sink,source等等其他地方用到,算是整个程序的全局配置
>
>
>
> zjfpla...@hotmail.com
>


Re: Flink sql 中 无法通过 TableEnvironment 调出 createTemporaryTable()方法 以及.TableException: findAndCreateTableSource failed 异常

2020-06-09 Thread Px New
Hi *Benchao Li*   Thanks ,你说的很对 我现在已经走在了sql的实践道路上(还好有你指出)

Benchao Li  于2020年6月9日周二 上午10:05写道:

> Hi,
> 我看你用的是1.9.1版本,但是createTemporaryTable应该是在1.10之后才引入的。不知道你参考的是哪一版的文档呢?
>
> Px New <15701181132mr@gmail.com> 于2020年6月8日周一 下午10:00写道:
>
> > Hi 社区:  关于flink sql 使用上的一个问题以及一个sql异常
> > 
> > 我通过官网给出的结构编写代码时发现注册临时表方法无法被调用?[图1, 图 2, 图 3]
> > 通过 tableEnvironment 调用createTemporaryTable 方法
> >
> > 我排查过。 但还是没能解决
> > 1:包倒入的是官网所声明的包。
> > 2:类倒入的是 flink.table.api.TableEnvironment/以及.java.StreamTableEnvironment
> 两个类
> >
> >
> >  图 1 (依赖导入):
> > https://imgkr.cn-bj.ufileos.com/941dbd86-34f4-4579-a53d-f7c04439d6f0.PNG
> >  图 2 (import *):
> > https://imgkr.cn-bj.ufileos.com/e19b90f7-ef60-42d8-a93e-d65c4269e053.png
> >  图 3 (无法调用?):
> > https://imgkr.cn-bj.ufileos.com/579e5336-503c-490f-83b9-ebb46bd1b568.png
> >  图 4 (官网格式):
> > https://imgkr.cn-bj.ufileos.com/fcc7365e-aecf-49a3-94e1-ae64ee67122e.png
> > 图5 (TableException: findAndCreateTableSource failed 异常)
> > https://imgkr.cn-bj.ufileos.com/386a783a-a42e-4466-97f8-540adb524be0.PNG
> > 以及:
> > https://imgkr.cn-bj.ufileos.com/9cac88cc-7601-4b03-ade1-f2432c84adac.PNG
> >
>
>
> --
>
> Best,
> Benchao Li
>


?????? ??????flink????????????????

2020-06-09 Thread 1048262223
Hi


Broadcast[1]??open??open??




[1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html


Best,
Yichao Yang


----
??:"zjfpla...@hotmail.com"

Re: NoSuchMethodError: org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.(Ljava/lang/String;Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader

2020-06-09 Thread Guowei Ma
Hi,
In 1.10 there is no
'Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader' . So I think
there might be a legacy S3FileSystemFactory in your jar. You could check
whether there is a 'org.apache.flink.fs.s3presto.common.HadoopConfigLoader'
in your jar or not. If there is one you could remove the
old S3FileSystemFactory and try again.

Btw I think you might not copy both flink-s3-fs-hadoop-1.10.0
and  flink-s3-fs-presto-1.10.0.jar to the same plugin dir. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
Best,
Guowei


Claude Murad  于2020年6月10日周三 上午4:06写道:

> Hello,
>
> I'm trying to upgrade Flink from 1.7 to 1.10 retaining our Hadoop
> integration.  I copied the jar
> file flink-shaded-hadoop-2-uber-2.7.5-10.0.jar into /opt/flink/lib.  I also
> copied the files flink-s3-fs-hadoop-1.10.0.jar and
> flink-s3-fs-presto-1.10.0.jar into /opt/flink/plugins/s3 folder.  The error
> below occurs after deploying and launching docker image 1.10.0-scala_2.11.
> I saw that S3FileSystemFactory.java is now importing
> org.apache.flink.runtime.util.HadoopConfigLoader instead of
> org.apache.flink.fs.s3.common.HadoopConfigLoader which is how it was
> before.  I see the jar file flink-dist_2.11-1.10.0.jar contains
> the org.apache.flink.runtime.util.HadoopConfigLoader and it is under the
> folder /opt/flink/lib.  Any ideas on how to resolve this error?  Any help
> would be greatly appreciated, thank you.
>
>
> ERROR org.apache.flink.core.fs.FileSystem   -
> Failed to load a file system via services
> java.util.ServiceConfigurationError:
> org.apache.flink.core.fs.FileSystemFactory: Provider
> org.apache.flink.fs.s3presto.S3PFileSystemFactory could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at
> org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1024)
> at
> org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1006)
> at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:303)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.configureFileSystems(ClusterEntrypoint.java:194)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:164)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
> at
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
> Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.(Ljava/lang/String;Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader;)V
> at
> org.apache.flink.fs.s3presto.S3FileSystemFactory.(S3FileSystemFactory.java:50)
> at
> org.apache.flink.fs.s3presto.S3PFileSystemFactory.(S3PFileSystemFactory.java:24)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
>


????: ??????flink????????????????

2020-06-09 Thread zjfpla...@hotmail.com
??



zjfpla...@hotmail.com
 
 1048262223
?? 2020-06-09 20:17
 user-zh
?? ??flink
Hi
 
 
rich functionopenbroadcast??
 
 
Best,
Yichao Yang
 
 
 
 
----
??:"zjfpla...@hotmail.com"

Re: 流groupby

2020-06-09 Thread 李奇
可以不设置窗口,直接用你的字段,我们就有很多没有窗口的业务场景,但是这样会比较耗内存。如果时间太长,也可能导致oom

> 在 2020年6月9日,下午12:24,allanqinjy  写道:
> 
> hi,
>   也就是指定 update-model retract就可以了是吧?好的多谢,我试试!
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2020-06-09 12:13:10,"1048262223" <1048262...@qq.com> 写道:
>> Hi
>> 
>> 
>> 可以不开窗口只不过结果是retract流而不是append流
>> 
>> 
>> Best,
>> Yichao Yang
>> 
>> 
>> 
>> 
>> 
>> 发自我的iPhone
>> 
>> 
>> -- 原始邮件 --
>> 发件人: allanqinjy > 发送时间: 2020年6月9日 12:11
>> 收件人: user-zh > 主题: 回复:流groupby



Re: Tumbling window with timestamp out-of-range events

2020-06-09 Thread Yu Yang
Please ignore this message. The issue was that a different timestamp
extractor was used when the kafka source was setup. That caused the issue.

On Tue, Jun 9, 2020 at 2:58 PM Yu Yang  wrote:

> Hi,
>
>
> We implement a flink application that uses TumblingWindow, and uses even
> time as time characteristics. In the TumblingWindow's process function, we
> has the implementation below that checks whether the event's timestamp is
> in the tumbling window's timestamp range.  We expected that all events
> shall be in the range. However, the application reports events with
> out-of-range timestamps.  Any insights on how this happens?
>
>
> @Override
> public void process(EventStreamPartitionKey key,
>   Context context, Iterable elements,
> Collector out) {
>
> for(Event event : elements) {
> if ( event.getTimestamp() >= context.window().getEnd() ||
>event.getTimestamp() < context.window().getStart() )
>
> System.out.println("NOT in RANGE: " + context.window().getStart()
>
> + ", " + event.getTimestamp() + ", " + context.window().getEnd());
> ...
>
> }
> out.collect(res);
> }
>
>
> Thanks!
>
>
> Regards,
>
> -Yu
>


Tumbling window with timestamp out-of-range events

2020-06-09 Thread Yu Yang
Hi,


We implement a flink application that uses TumblingWindow, and uses even
time as time characteristics. In the TumblingWindow's process function, we
has the implementation below that checks whether the event's timestamp is
in the tumbling window's timestamp range.  We expected that all events
shall be in the range. However, the application reports events with
out-of-range timestamps.  Any insights on how this happens?


@Override
public void process(EventStreamPartitionKey key,
  Context context, Iterable elements,
Collector out) {

for(Event event : elements) {
if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

System.out.println("NOT in RANGE: " + context.window().getStart()

+ ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}


Thanks!


Regards,

-Yu


Re: Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Deshpande, Omkar
I have observed that state gets drained irrespective of the value of the 
"drain".

I am using -
org.apache.beam
beam-runners-flink-1.9
2.19.0

And I am running a kafka wordcount app with fixed window of 1 hour and when I 
stop the app with the stop 
endpoint
 before 1 hour, the records get drained. I have tried with {"drain":true} and 
{"drain":false} in the body of the POST request. The drain behavior remains the 
same.


From: Kostas Kloudas 
Sent: Tuesday, June 9, 2020 4:48 AM
To: Deshpande, Omkar 
Cc: user@flink.apache.org ; Hwang, Nick 
; Benenson, Mikhail ; 
LeVeck, Matt ; Kathula, Sandeep 

Subject: Re: Stopping flink application with /jobs/:jobid/savepoints or 
/jobs/:jobid/stop

This email is from an external sender.


Hi Omkar,

For the first part of the question where you set the "drain" to false
and the state gets drained, this can be an issue on our side. Just to
clarify, no matter what is the value of the "drain", Flink always
takes a savepoint. Drain simply means that we also send MAX_WATERMARK
before taking the savepoint. Is this what you observe? I.e. that you
have an infinite input stream and even if you set drain to false, you
still see the MAX_WATERMARK?

For the second part of the question, the cancel-with-savepoint is a
deprecated command. But it is not removed for backwards compatibility.
So you can still have a cancel-with-savepoint in the way you
described. The difference between the deprecated cancel-with-savepoint
and the recommended stop-with-savepoint is that the
stop-with-savepoint guarantees that if you are using an exactly-once
sink, the side-effects are going to be committed to the sink before
the job exits. This was not the case for cancel-with-savepoint. For
more details, you can have a look at [1].

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Tue, Jun 9, 2020 at 4:40 AM Deshpande, Omkar
 wrote:
>
> Hello,
>
> When I try to stop the job with /jobs/:jobid/stop REST endpoint, the state 
> gets drained, even if I pass {"drain":false} in the body of the post request. 
> Is the value of drain flag true by default? Why is not getting used when I 
> pass {"drain":false}?
>
> And I can also stop a job using this endpoint /jobs/:jobid/savepoints with 
> {"cancel-job":"true"} in the body. In this case there the state is not 
> drained. What is the difference between these 2 endpoints? Is there a reason 
> to use one over the other?
>
> If I want to stop a job with savepoint but without draining the state which 
> endpoint should be used?
>
> Omkar


NoSuchMethodError: org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.(Ljava/lang/String;Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader

2020-06-09 Thread Claude Murad
Hello,

I'm trying to upgrade Flink from 1.7 to 1.10 retaining our Hadoop
integration.  I copied the jar
file flink-shaded-hadoop-2-uber-2.7.5-10.0.jar into /opt/flink/lib.  I also
copied the files flink-s3-fs-hadoop-1.10.0.jar and
flink-s3-fs-presto-1.10.0.jar into /opt/flink/plugins/s3 folder.  The error
below occurs after deploying and launching docker image 1.10.0-scala_2.11.
I saw that S3FileSystemFactory.java is now importing
org.apache.flink.runtime.util.HadoopConfigLoader instead of
org.apache.flink.fs.s3.common.HadoopConfigLoader which is how it was
before.  I see the jar file flink-dist_2.11-1.10.0.jar contains
the org.apache.flink.runtime.util.HadoopConfigLoader and it is under the
folder /opt/flink/lib.  Any ideas on how to resolve this error?  Any help
would be greatly appreciated, thank you.


ERROR org.apache.flink.core.fs.FileSystem   -
Failed to load a file system via services
java.util.ServiceConfigurationError:
org.apache.flink.core.fs.FileSystemFactory: Provider
org.apache.flink.fs.s3presto.S3PFileSystemFactory could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at
org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1024)
at
org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1006)
at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:303)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.configureFileSystems(ClusterEntrypoint.java:194)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:164)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
at
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
Caused by: java.lang.NoSuchMethodError:
org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.(Ljava/lang/String;Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader;)V
at
org.apache.flink.fs.s3presto.S3FileSystemFactory.(S3FileSystemFactory.java:50)
at
org.apache.flink.fs.s3presto.S3PFileSystemFactory.(S3PFileSystemFactory.java:24)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)


Re: Troubles with Avro migration from 1.7 to 1.10

2020-06-09 Thread Kostas Kloudas
Hi Alan,

In the upcoming Flink 1.11 release, there will be support for Avro
using the AvroWriterFactory as seen in [1].
Do you think that this can solve your problem?

You can also download the current release-1.11 branch and also test it
out to see if it fits your needs.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-11395

On Tue, Jun 9, 2020 at 4:23 PM Alan Żur  wrote:
>
>
>
>
>
> Hi,
>
>
>
> I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, 
> however I’ve encountered problem with Avro writing to hdfs. Currently we’re 
> using Bucketing sink – which is deprecated. I’ve managed to replace few 
> Bucketing sinks with StreamingFileSink with row format. However I don’t have 
> any idea how to tackle Avro and Writer<> implementation.
>
>
>
> @Override
> protected void applySink(DataStream outputStream) {
> outputStream
> .keyBy(Feature::getSessionId)
> .addSink(createSink())
> .uid(UID_PART.concat("sink-v1"))
> .name(UID_PART.concat("hdfs_bucketing_sink"));
> }
>
> private SinkFunction createSFSink() {
> return StreamingFileSink
> .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
> ParquetAvroWriters.forGenericRecord(new 
> ComboFeatureAvroWriter().createSchema()))
> .build();
> }
>
> private BucketingSink createSink() {
> return new BucketingSink(hdfsPath)
> .setBucketer(new DateTypeComboFeatureBucketer("-MM-dd", 
> ZoneOffset.UTC))
> .setBatchSize(batchSize)
> .setBatchRolloverInterval(batchRollingInterval)
> .setInactiveBucketCheckInterval(checkInactiveBucketInterval)
> .setInactiveBucketThreshold(inactiveBucketThreshold)
> .setUseTruncate(useTruncate)
> .setWriter(new ComboFeatureAvroWriter());
> }
>
> Above function createSFSink() I took from 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>  I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to 
> write empty GenericRecord map just to get rid of compilation error – failed 
> (still giving improper type error). I’ve also tried to use 
> ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed


Re: Age old stop vs cancel debate

2020-06-09 Thread Kostas Kloudas
I understand. Thanks for looking into it Senthil!

Kostas

On Tue, Jun 9, 2020 at 7:32 PM Senthil Kumar  wrote:
>
> OK, will do and report back.
>
> We are on 1.9.1,
>
> 1.10 will take some time __
>
> On 6/9/20, 2:06 AM, "Kostas Kloudas"  wrote:
>
> Hi Senthil,
>
> From a quick look at the code, it seems that the cancel() of your
> source function should be called, and the thread that it is running on
> should be interrupted.
>
> In order to pin down the problem and help us see if this is an actual
> bug, could you please:
> 1) enable debug logging and see if you can spot some lines like this:
>
> "Starting checkpoint (-ID) SYNC_SAVEPOINT on task X" or sth
> similar with synchronous savepoint in it
>
> and any other message afterwards with -ID in it to see if the
> savepoint is completed successfully.
>
> 2) could you see if this behavior persists in the FLINK-1.10?
>
> Thanks,
> Kostas
>
> On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar  wrote:
> >
> > Robert,
> >
> >
> >
> > Thank you once again! We are currently doing the “short” Thread.sleep() 
> approach. Seems to be working fine.
> >
> >
> >
> > Cheers
> >
> > Kumar
> >
> >
> >
> > From: Robert Metzger 
> > Date: Tuesday, June 2, 2020 at 2:40 AM
> > To: Senthil Kumar 
> > Cc: "user@flink.apache.org" 
> > Subject: Re: Age old stop vs cancel debate
> >
> >
> >
> > Hi Kumar,
> >
> >
> > this is more a Java question than a Flink question now :) If it is 
> easily possible from your code, then I would regularly check the isRunning 
> flag (by having short Thread.sleeps()) to have a proper cancellation behavior.
> >
> > If this makes your code very complicated, then you could work with 
> manually interrupting your worker thread. I would only use this method if you 
> are sure your code (and the libraries you are using) are properly handling 
> interrupts.
> >
> > Sorry that I can not give you a more actionable response. It depends a 
> lot on the structure of your code and the libraries you are calling into.
> >
> >
> >
> > Best,
> >
> > Robert
> >
> >
> >
> >
> >
> > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar  
> wrote:
> >
> > Hi Robert,
> >
> >
> >
> > Would appreciate more insights please.
> >
> >
> >
> > What we are noticing: When the flink job is issued a stop command, the 
> Thread.sleep is not receiving the InterruptedException
> >
> >
> >
> > It certainly receives the exception when the flink job is issued a 
> cancel command.
> >
> >
> >
> > In both cases (cancel and stop) the cancel() method is getting called 
> (to set the isRunning variable to false)
> >
> >
> >
> > However, given that the thread does not get interrupted in stop, we are 
> not in a position to check the isRunning variable.
> >
> >
> >
> >
> >
> > For now, we are doing a Thread.sleep  every 5 minutes (instead of the 
> normal interval which is in hours).
> >
> > Sleeping for 5 minutes gives us a chance to check the isRunning 
> variable.
> >
> >
> >
> > Another approach would be to save the currentThread 
> (Thread.currentThread()) before doing a Thread.sleep())
> >
> > and manually calling Thread.interrupt() from the cancel function.
> >
> >
> >
> > What is your recommendation?
> >
> >
> >
> > Cheers
> >
> > Kumar
> >
> >
> >
> >
> >
> > From: Robert Metzger 
> > Date: Friday, May 29, 2020 at 4:38 AM
> > To: Senthil Kumar 
> > Cc: "user@flink.apache.org" 
> > Subject: Re: Age old stop vs cancel debate
> >
> >
> >
> > Hi Kumar,
> >
> >
> >
> > They way you've implemented your custom source sounds like the right 
> way: Having a "running" flag checked by the run() method and changing it in 
> cancel().
> >
> > Also, it is good that you are properly handling the interrupt set by 
> Flink (some people ignore InterruptedExceptions, which make it difficult 
> (basically impossible) for Flink to stop the job)
> >
> >
> >
> > Best,
> >
> > Robert
> >
> >
> >
> >
> >
> > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar  
> wrote:
> >
> > We are on flink 1.9.0
> >
> >
> >
> > I have a custom SourceFunction, where I rely on isRunning set to false 
> via the cancel() function to exit out of the run loop.
> >
> > My run loop essentially gets the data from S3, and then simply sleeps 
> (Thread.sleep) for a specified time interval.
> >
> >
> >
> > When a job gets cancelled, the SourceFunction.cancel() is called, which 
> sets the isRunning to false.
> >
> > In addition, the Thread.sleep 

Dynamic rescaling in Flink

2020-06-09 Thread Prasanna kumar
Hi all,

Does flink support dynamic scaling. Say try to add/reduce nodes based upon
incoming load.

Because our use case is such that we get peak loads for 4 hours and then
medium loads for 8 hours and then light to no load for rest 2 hours.

Or peak load would be atleast 5 times the medium load.

Has anyone used flink in these type of scenario? We are looking at flink
for it's low latency performance.

Earlier I worked with Spark+YARN which provides a features to dynamicaly
add/reduce executors.

Wanted to know the same on flink.

Thanks,
Prasanna


Re: Age old stop vs cancel debate

2020-06-09 Thread Senthil Kumar
OK, will do and report back.

We are on 1.9.1,

1.10 will take some time __

On 6/9/20, 2:06 AM, "Kostas Kloudas"  wrote:

Hi Senthil,

From a quick look at the code, it seems that the cancel() of your
source function should be called, and the thread that it is running on
should be interrupted.

In order to pin down the problem and help us see if this is an actual
bug, could you please:
1) enable debug logging and see if you can spot some lines like this:

"Starting checkpoint (-ID) SYNC_SAVEPOINT on task X" or sth
similar with synchronous savepoint in it

and any other message afterwards with -ID in it to see if the
savepoint is completed successfully.

2) could you see if this behavior persists in the FLINK-1.10?

Thanks,
Kostas

On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar  wrote:
>
> Robert,
>
>
>
> Thank you once again! We are currently doing the “short” Thread.sleep() 
approach. Seems to be working fine.
>
>
>
> Cheers
>
> Kumar
>
>
>
> From: Robert Metzger 
> Date: Tuesday, June 2, 2020 at 2:40 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
> this is more a Java question than a Flink question now :) If it is easily 
possible from your code, then I would regularly check the isRunning flag (by 
having short Thread.sleeps()) to have a proper cancellation behavior.
>
> If this makes your code very complicated, then you could work with 
manually interrupting your worker thread. I would only use this method if you 
are sure your code (and the libraries you are using) are properly handling 
interrupts.
>
> Sorry that I can not give you a more actionable response. It depends a 
lot on the structure of your code and the libraries you are calling into.
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Fri, May 29, 2020 at 10:48 PM Senthil Kumar  
wrote:
>
> Hi Robert,
>
>
>
> Would appreciate more insights please.
>
>
>
> What we are noticing: When the flink job is issued a stop command, the 
Thread.sleep is not receiving the InterruptedException
>
>
>
> It certainly receives the exception when the flink job is issued a cancel 
command.
>
>
>
> In both cases (cancel and stop) the cancel() method is getting called (to 
set the isRunning variable to false)
>
>
>
> However, given that the thread does not get interrupted in stop, we are 
not in a position to check the isRunning variable.
>
>
>
>
>
> For now, we are doing a Thread.sleep  every 5 minutes (instead of the 
normal interval which is in hours).
>
> Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>
>
>
> Another approach would be to save the currentThread 
(Thread.currentThread()) before doing a Thread.sleep())
>
> and manually calling Thread.interrupt() from the cancel function.
>
>
>
> What is your recommendation?
>
>
>
> Cheers
>
> Kumar
>
>
>
>
>
> From: Robert Metzger 
> Date: Friday, May 29, 2020 at 4:38 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
>
> They way you've implemented your custom source sounds like the right way: 
Having a "running" flag checked by the run() method and changing it in cancel().
>
> Also, it is good that you are properly handling the interrupt set by 
Flink (some people ignore InterruptedExceptions, which make it difficult 
(basically impossible) for Flink to stop the job)
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Wed, May 27, 2020 at 7:38 PM Senthil Kumar  
wrote:
>
> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false 
via the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which 
sets the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the 
isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not 
get interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which 
seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink 
job 

Re: Failed to deserialize Avro record

2020-06-09 Thread Ramana Uppala
Hi Arvid / Dawid,

Yes we did small POC with custom Avro Row Deserializer which uses 
ConfluentRegistryAvroDeSerializationSchema and we are able to parse the message.

We have Schema registry and users are given choice to produce with different 
serialization mechanisms. Some messages we are able to parse with 
"AvroRowDeserializationSchema" some we couldn't. Our understanding is that 
failed messages topics are produced with confluent serialization.

Is there any uniform AvroRowDeserialization that works with all scenarios ?

On 2020/06/09 11:03:23, Arvid Heise  wrote: 
> If data is coming from Kafka, the write schema is most likely stored in a
> Schema Registry. If so, you absolutely need to use
> ConfluentRegistryAvroSerializationSchema of the
> *flink-avro-confluent-registry* package.
> 
> If you didn't opt for that most common architecture pattern, then you often
> run into that the write schema and the supplied schema do not match. That
> could also be the case here (but try the other serialization schema first).
> If it still prevails, please elaborate how you manage the schema. It's also
> helpful to see an example record and the schema if possible.
> 
> On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz 
> wrote:
> 
> > It's rather hard to help if we don't know the format in which the
> > records are serialized. There is a significant difference if you use a
> > schema registry or not. All schema registries known to me prepend the
> > actual data with some kind of magic byte and an identifier of the
> > schema. Therefore if we do not know to expect that we cannot properly
> > deserialize the record.
> >
> > Nevertheless I would not say the problem has something to do with schema
> > registry. If I understand you correctly some records can be
> > deserialized. If they were produced with the schema registry type of
> > serialization all would fail.
> >
> > What I can recommend is to try to log/identify a record that cannot be
> > deserialized and check debug the AvroRowDeserializationSchema with it.
> >
> > Best,
> >
> > Dawid
> >
> > On 06/06/2020 16:27, Ramana Uppala wrote:
> > > We are using AvroRowDeserializationSchema with Kafka Table source to
> > deserialize the messages. Application failed with "Failed to deserialize
> > Avro record." for different messages it seems.
> > >
> > > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length
> > is negative: -26
> > >
> > > Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
> > >   at
> > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
> > ~[avro-1.8.2.jar:1.8.2]
> > >
> > > We are not sure what the serialization mechanism producer is using to
> > publish the messages at this time. But above errors are related to
> > https://issues.apache.org/jira/browse/FLINK-16048 ?
> > >
> > > Any suggestions on fixing above issues ? we are using Flink 1.10
> >
> >
> 
> -- 
> 
> Arvid Heise | Senior Java Developer
> 
> 
> 
> Follow us @VervericaData
> 
> --
> 
> Join Flink Forward  - The Apache Flink
> Conference
> 
> Stream Processing | Event Driven | Real Time
> 
> --
> 
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> 
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
> 


Re: Flink Stream job to parquet sink

2020-06-09 Thread aj
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj  wrote:

> Hello All,
>
> I am receiving a set of events in Avro format on different topics. I want
> to consume these and write to s3 in parquet format.
> I have written a below job that creates a different stream for each event
> and fetches it schema from the confluent schema registry to create a
> parquet sink for an event.
> This is working fine but the only problem I am facing is whenever a new
> event start coming I have to change in the YAML config and restart the job
> every time. Is there any way I do not have to restart the job and it start
> consuming a new set of events.
>
>
> YAML config :
>
> !com.bounce.config.EventTopologyConfig
> eventsType:
>   - !com.bounce.config.EventConfig
> event_name: "search_list_keyless"
> schema_subject: 
> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
> topic: "search_list_keyless"
>
>   - !com.bounce.config.EventConfig
> event_name: "bike_search_details"
> schema_subject: 
> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
> topic: "bike_search_details"
>
>   - !com.bounce.config.EventConfig
> event_name: "keyless_bike_lock"
> schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
> topic: "analytics-keyless"
>
>   - !com.bounce.config.EventConfig
>   event_name: "keyless_bike_unlock"
>   schema_subject: 
> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>   topic: "analytics-keyless"
>
>
> checkPointInterval: 120
>
> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>
>
>
>
>
> *Sink code :*
>
>   YamlReader reader = new YamlReader(topologyConfig);
> EventTopologyConfig eventTopologyConfig = 
> reader.read(EventTopologyConfig.class);
>
> long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
> topics = eventTopologyConfig.getTopics();
>
> List eventTypesList = 
> eventTopologyConfig.getEventsType();
>
> CachedSchemaRegistryClient registryClient = new 
> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>
>
> FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
> properties);
>
> DataStream dataStream = 
> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>
> try {
> for (EventConfig eventConfig : eventTypesList) {
>
> LOG.info("creating a stream for ", eventConfig.getEvent_name());
>
> final StreamingFileSink sink = StreamingFileSink.forBulkFormat
> (path, 
> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>  registryClient)))
> .withBucketAssigner(new EventTimeBucketAssigner())
> .build();
>
> DataStream outStream = 
> dataStream.filter((FilterFunction) genericRecord -> {
> if (genericRecord != null && 
> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) 
> {
> return true;
> }
> return false;
> });
> 
> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
>
>
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: [External Sender] Re: Flink sql nested elements

2020-06-09 Thread Ramana Uppala
Hi Dawid,

This issue has been resolved.

>From our debugging we found out that Calcite parser was able to resolve the 
>nested elements as expected. But, expecting case to match with the schema. Our 
>SQL select field case and schema field case was not matching in this scenario. 
>After fixing sql to have the correct case, query worked as expected.

Is Flink SQL case is case sensitive ? We don't see any documentation related to 
this.

It will be great if we can convert all query elements to lower case similar to 
Hive.

On 2020/06/09 07:58:20, Dawid Wysakowicz  wrote: 
> Hi Ramana,
> 
> Could you help us with a way to reproduce the behaviour? I could not
> reproduce it locally. The code below works for me just fine:
> 
> |StreamExecutionEnvironment exec =
> StreamExecutionEnvironment.getExecutionEnvironment();||
> ||StreamTableEnvironment tEnv = StreamTableEnvironment.create(||
> ||        exec,||
> ||       
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());||
> ||tEnv.registerTableSource(||
> ||        "T",||
> ||        new StreamTableSource() {||
> ||            @Override||
> ||            public TableSchema getTableSchema() {||
> ||                return TableSchema.builder()||
> ||                        .field("f3",
> DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING(||
> ||                        .build();||
> ||            }||
> ||            @Override||
> ||            public DataStream
> getDataStream(StreamExecutionEnvironment execEnv) {||
> ||                return execEnv.fromCollection(||
> ||                        Arrays.asList(Row.of(Row.of("ABCDE")))||
> ||                );||
> ||            }||
> ||            @Override||
> ||            public DataType getProducedDataType() {||
> ||                return DataTypes.ROW(||
> ||                        DataTypes.FIELD(||
> ||                                "f3",||
> ||                               
> DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING()))||
> ||                        )||
> ||                );||
> ||            }||
> ||        });||
> ||Table table = tEnv.sqlQuery("SELECT f3.nested FROM T");||
> ||DataStream result = tEnv.toAppendStream(||
> ||        table,||
> ||        Types.ROW(Types.STRING()));||
> ||result.print();||
> ||exec.execute();|
> 
> Best,
> 
> Dawid
> 
> On 05/06/2020 13:59, Ramana Uppala wrote:
> > Hi Leonard,
> >
> > We are using Flink 1.10 version and I can not share the complete
> > schema but it looks like below in Hive Catalog, 
> >
> > flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
> > `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> > VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> >
> > Based on the stack trace, sqlUpdate API validates the sql statement
> > and throwing the above error.  Do we need to configure any Calcite
> > configuration to support nested types ?
> >
> > Thanks,
> > Ramana.
> >
> > On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu  > > wrote:
> >
> > Hi,Ramana
> >
> > For nested data type, Flink use dot (eg a.b.c) to visit nested
> > elements. Your SQL syntax looks right, which Flink version are you
> > using? And could you post your Avro Schema file and DDL ?
> >
> > Best,
> > Leonard Xu
> >
> > > 在 2020年6月5日,03:34,Ramana Uppala  > > 写道:
> > >
> > > We have Avro schema that contains nested structure and when
> > querying using Flink SQL, we are getting below error.
> > >
> > > Exception in thread "main" java.lang.AssertionError
> > >       at
> > org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> > >       at
> > org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> > >       at
> > 
> > org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> > >       at
> > 
> > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> > >       at
> > 
> > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> > >       at
> > 
> > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> > >       at
> > org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> > >
> > > Example Schema:
> > > ROW<`col1` VARCHAR(2147483647), `postalAddress`
> > ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> > VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> > >
> > > Example SQL:
> > > insert into CSVSink
> > > select
> > > col1,
> > > postalAddress.addressLine1 as address
> > > from myStream
> > >
> > > In Flink SQL, How to select nested elements ?
> > >
> >
> > 
> >
> >
> > The information contained in this 

Re: Understading Flink statefun deployment

2020-06-09 Thread Igal Shilman
Hi Francesco,

It is absolutely possible to deploy some functions as embedded and some as
remote, and scale them independently, while technically being part of the
same
stateful function application instance (I think that what you meant by
"sharing the same master").

One possible way to do it in k8s, is to have separate deployments:
1) Embedded functions would be bundled with the Docker image that starts
the Flink cluster. (flink-statefun docker image)
2) Remote functions would be packaged in a separate image deployed as a
separate kubernetes deployment, and reachable via a k8s service.

For the second part checkout out the demo at the end of the keynote [1] and
also [2][3][4]

[1] https://www.youtube.com/watch?v=NF0hXZfUyqE
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-k8s-example/python-worker-deployment.yaml
[3]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-k8s-example/python-worker-service.yaml#L19
[4]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-k8s-example/module.yaml#L26

Good luck,
Igal.





On Tue, Jun 9, 2020 at 10:50 AM Francesco Guardiani <
francescogu...@gmail.com> wrote:

> Hi everybody,
> I'm quite new to Flink and Flink Statefun and I'm trying to understand the
> deployment techniques on k8s.
> I wish to understand if it's feasible to deploy a statefun project
> separating the different functions on separate deployments (in order to
> have some functions as remote and some as embedded) all connected to the
> same master. The idea is that I can scale the deployments independently
> using the Kubernetes HPA and these instances cooperate automatically using
> the same master. For example, given a flow like kafka -> fn a -> fn b ->
> kafka:
>
> * Remote function A (plus ingress) in deployment fn-a, where the function
> process is deployed as another container in the same pod
> * embedded function B (plus egress) in deployment fn-b
> * master deployment in flink-master
>
> Does that make sense at all in Flink architecture? If it's feasible, do
> you have any example?
>
> FG
>
> --
> Francesco Guardiani
> Website: https://slinkydeveloper.com/
> Twitter: https://twitter.com/SlinkyGuardiani
>
> Github: https://github.com/slinkydeveloper
>


Incremental state

2020-06-09 Thread Annemarie Burger
Hi,

What I'm trying to do is the following: I want to incrementally add and
delete elements to a state. If the element expires/goes out of the window,
it needs to be removed from the state. I basically want the functionality of
TTL, without using it, since I'm also using Queryable State and these two
features can't be combined. Ofcourse I can give a "valid untill" time to
each element when I'm adding it to the state using a ProcessFunction, and
periodically iterate over the state to remove expired elements, but I was
wondering if there is a more efficient way. For example to use a timer,
which we give the element as a parameter, so that when the timer fires, x
seconds after the timer was set, it can just look up the element directly
and remove it. But how would I implement this? 

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Troubles with Avro migration from 1.7 to 1.10

2020-06-09 Thread Alan Żur


Hi,

I was assigned to migrate out Flink 1.7 to 1.10 so far it's going good, however 
I've encountered problem with Avro writing to hdfs. Currently we're using 
Bucketing sink - which is deprecated. I've managed to replace few Bucketing 
sinks with StreamingFileSink with row format. However I don't have any idea how 
to tackle Avro and Writer<> implementation.

@Override
protected void applySink(DataStream outputStream) {
outputStream
.keyBy(Feature::getSessionId)
.addSink(createSink())
.uid(UID_PART.concat("sink-v1"))
.name(UID_PART.concat("hdfs_bucketing_sink"));
}

private SinkFunction createSFSink() {
return StreamingFileSink
.forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
ParquetAvroWriters.forGenericRecord(new 
ComboFeatureAvroWriter().createSchema()))
.build();
}

private BucketingSink createSink() {
return new BucketingSink(hdfsPath)
.setBucketer(new DateTypeComboFeatureBucketer("-MM-dd", 
ZoneOffset.UTC))
.setBatchSize(batchSize)
.setBatchRolloverInterval(batchRollingInterval)
.setInactiveBucketCheckInterval(checkInactiveBucketInterval)
.setInactiveBucketThreshold(inactiveBucketThreshold)
.setUseTruncate(useTruncate)
.setWriter(new ComboFeatureAvroWriter());
}
Above function createSFSink() I took from 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 I've tried changing GenericRecord to Feature class - fail, I've tried to write 
empty GenericRecord map just to get rid of compilation error - failed (still 
giving improper type error). I've also tried to use 
ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread tison
噢,那应该就是上面说的问题了

你的 Dispatcher 能被发现说明一开始选主和发布是 ok 的,你可以贴一下 HA
的配置,看看有没特别不靠谱的,然后去日志里找一下丢 leadership 的日志,一般来说前后会有一堆 zk 链接 ConnectionLoss 或者
SessionExpire 的日志

Best,
tison.


whirly  于2020年6月9日周二 下午9:23写道:

> Flink 1.8
>
>
>
>
> | |
> whirly
> |
> |
> 邮箱:whir...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月09日 21:15,tison 写道:
> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
> > 大家好:
> > 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
> >
> >
> > 异常信息:
> > Internal server error.,
> >  > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
> >  Fencing token not set: Ignoring message
> > LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> > LocalRpcInvocation(requestMultipleJobDetails(Time)))
> > sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
> the
> > fencing token is null.
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> > at
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > End of exception on server side>
>


Re: Data Quality Library in Flink

2020-06-09 Thread aj
Thanks, Andrey, I will check it out.

On Mon, Jun 8, 2020 at 8:10 PM Andrey Zagrebin  wrote:

> Hi Anuj,
>
> I am not familiar with data quality measurement methods and deequ
>  in depth.
> What you describe looks like monitoring some data metrics.
> Maybe, there are other community users aware of better solution.
> Meanwhile, I would recommend to implement the checks and failures as
> separate operators and side outputs (for streaming) [1], if not yet
> Then you could also use Flink metrics to aggregate and monitor the data
> [2].
> The metrics systems usually allow to define alerts on metrics, like in
> prometheus [3], [4].
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/side_output.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
> [4] https://prometheus.io/docs/alerting/overview/
>
> On Sat, Jun 6, 2020 at 9:23 AM aj  wrote:
>
>> Hello All,
>>
>> I  want to do some data quality analysis on stream data example.
>>
>> 1. Fill rate in a particular column
>> 2. How many events are going to error queue due to favor schema
>> validation failed?
>> 3. Different statistics measure of a column.
>> 3. Alert if a particular threshold is breached (like if fill rate is less
>> than 90% for a column)
>>
>> Is there any library that exists on top of Flink for data quality. As I
>> am looking there is a library on top of the spark
>> https://github.com/awslabs/deequ
>>
>> This proved all that I am looking for.
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> 
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 Thread 方盛凯
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。

陈赋赟  于2020年6月8日周一 上午11:53写道:

> 原先sql任务是:
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>
>
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> CREATE TABLE C_source(...)
> CREATE TABLE D_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
>
>
> INSERT INTO C_sink
> SELECT
>  1
> FROM
> D_source
> ;
> 并基于Savepoint提交,结果显示
>
> Cannot map checkpoint/savepoint state for operator
> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
> is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
>
>
> 想请教一下底层是因为什么原因导致了opertor匹配不上?


回复:flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread whirly
Flink 1.8




| |
whirly
|
|
邮箱:whir...@163.com
|

签名由 网易邮箱大师 定制

在2020年06月09日 21:15,tison 写道:
啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的

Best,
tison.


whirly  于2020年6月9日周二 下午8:58写道:

> 大家好:
> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>
>
> 异常信息:
> Internal server error.,
>  side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>  Fencing token not set: Ignoring message
> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> LocalRpcInvocation(requestMultipleJobDetails(Time)))
> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because the
> fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread tison
你可以详细说一下场景,这个我想了一下应该是你选举窗口太长了

0. 某个时候,Dispatcher 选出了 Leader 并发布自己的地址
1. 某个组件向 Dispatcher 发了个消息,你这里前端点击之后后端 WebMonitor 给 Dispatcher 发
requestMultipleJobDetails
消息
2. Dispatcher 跟 zk 链接抖动,丢 leader 了。早期版本会把这个 fencing token 设置成 null
3. 1 里面的消息到达 Dispatcher,Dispatcher 走 fencing token 逻辑,看到是 null
4. 抛出此异常

如果稍后又选举成功,这里的异常应该是 fencing token mismatch 一类的

Best,
tison.


tison  于2020年6月9日周二 下午9:15写道:

> 啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的
>
> Best,
> tison.
>
>
> whirly  于2020年6月9日周二 下午8:58写道:
>
>> 大家好:
>> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>>
>>
>> 异常信息:
>> Internal server error.,
>> > side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>>  Fencing token not set: Ignoring message
>> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
>> LocalRpcInvocation(requestMultipleJobDetails(Time)))
>> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because
>> the fencing token is null.
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> End of exception on server side>
>
>


Re: flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread tison
啥 flink 版本啊?1.10 Dispatcher 魔改之后应该不会 null 的

Best,
tison.


whirly  于2020年6月9日周二 下午8:58写道:

> 大家好:
> 环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?
>
>
> 异常信息:
> Internal server error.,
>  side:\norg.apache.flink.runtime.rpc.exceptions.FencingTokenException:
>  Fencing token not set: Ignoring message
> LocalFencedMessage(93428232915e7c6e4947aaa5910341a8,
> LocalRpcInvocation(requestMultipleJobDetails(Time)))
> sent to akka.tcp://flink@cluster2-host1:40443/user/dispatcher because the
> fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>


Re: Failed to deserialize Avro record

2020-06-09 Thread Dawid Wysakowicz
Good to hear.

There is no schema that would support all ways. I would also rather
discourage such approach, as it makes it really hard to make changes to
the records schema. I would strongly recommend using schema registry for
all records.

If you still want to have a schema that would work for both you could
implement one based on both ConfluentRegistryAvroDeSerializationSchema
and AvroRowDeserialization which would check for the magic byte. If the
magic byte is present deserialize with
ConfluentRegistryAvroDeSerializationSchema. If it does not with the
AvroRowDeserialization. But again I'd rather discourage such approach.

Best,

Dawid

On 09/06/2020 14:21, Ramana Uppala wrote:
> Hi Arvid / Dawid,
>
> Yes we did small POC with custom Avro Row Deserializer which uses 
> ConfluentRegistryAvroDeSerializationSchema and we are able to parse the 
> message.
>
> We have Schema registry and users are given choice to produce with different 
> serialization mechanisms. Some messages we are able to parse with 
> "AvroRowDeserializationSchema" some we couldn't. Our understanding is that 
> failed messages topics are produced with confluent serialization.
>
> Is there any uniform AvroRowDeserialization that works with all scenarios ?
>
> On 2020/06/09 11:03:23, Arvid Heise  wrote: 
>> If data is coming from Kafka, the write schema is most likely stored in a
>> Schema Registry. If so, you absolutely need to use
>> ConfluentRegistryAvroSerializationSchema of the
>> *flink-avro-confluent-registry* package.
>>
>> If you didn't opt for that most common architecture pattern, then you often
>> run into that the write schema and the supplied schema do not match. That
>> could also be the case here (but try the other serialization schema first).
>> If it still prevails, please elaborate how you manage the schema. It's also
>> helpful to see an example record and the schema if possible.
>>
>> On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz 
>> wrote:
>>
>>> It's rather hard to help if we don't know the format in which the
>>> records are serialized. There is a significant difference if you use a
>>> schema registry or not. All schema registries known to me prepend the
>>> actual data with some kind of magic byte and an identifier of the
>>> schema. Therefore if we do not know to expect that we cannot properly
>>> deserialize the record.
>>>
>>> Nevertheless I would not say the problem has something to do with schema
>>> registry. If I understand you correctly some records can be
>>> deserialized. If they were produced with the schema registry type of
>>> serialization all would fail.
>>>
>>> What I can recommend is to try to log/identify a record that cannot be
>>> deserialized and check debug the AvroRowDeserializationSchema with it.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 06/06/2020 16:27, Ramana Uppala wrote:
 We are using AvroRowDeserializationSchema with Kafka Table source to
>>> deserialize the messages. Application failed with "Failed to deserialize
>>> Avro record." for different messages it seems.
 Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length
>>> is negative: -26
 Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
   at
>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>> ~[avro-1.8.2.jar:1.8.2]
 We are not sure what the serialization mechanism producer is using to
>>> publish the messages at this time. But above errors are related to
>>> https://issues.apache.org/jira/browse/FLINK-16048 ?
 Any suggestions on fixing above issues ? we are using Flink 1.10
>>>
>> -- 
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>



signature.asc
Description: OpenPGP digital signature


flink JobManager HA 异常 the fencing token is null

2020-06-09 Thread whirly
大家好:
环境一些作业出现下面异常,怀疑是Flink JobManager配置HA的问题,请问可能是什么问题?


异常信息:
Internal server error.,


Re: Flink新出的OrcBulkWriterFactory有没有大佬给个详细的Demo

2020-06-09 Thread Yun Gao
Hello,

例子的话可以参考一下Flink文档 [1] 和相关的测试case [2] 。

   现在跑的作业有开checkpoint么?另外,这个作业是一个无限数据流的作业还是一个有限数据流的作业?



 [1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
 找Orc Format一节,这个doc页现在格式坏了,社区在修
 [2]  
https://github.com/apache/flink/blob/master/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java


--
Sender:夏帅
Date:2020/06/08 17:28:08
Recipient:user-zh
Theme:Flink新出的OrcBulkWriterFactory有没有大佬给个详细的Demo

自己在使用时,会有文件生成,但是文件内并不包含数据


Re: 请问 StreamingFileSink如何写数据到其它HA的Hadoop集群,并且是yarn job

2020-06-09 Thread Yun Gao
Hello,

 就是现在有没有遇到具体的错误?我理解应该需要在flink 
TM所运行机器上的HADOOP_CONF_DIR底下的hdfs.site配置一些参数。可能可以参考之前的提问:[1]


   [1] 
http://apache-flink.147419.n8.nabble.com/How-to-write-stream-data-to-other-Hadoop-Cluster-by-StreamingFileSink-td792.html


--
Sender:苯鵝飛啊飛
Date:2020/06/09 20:11:58
Recipient:user-zh
Theme:请问 StreamingFileSink如何写数据到其它HA的Hadoop集群,并且是yarn job

你好,如题:
请问 StreamingFileSink如何写数据到其它HA的Hadoop集群,并且是yarn job


感谢!

莫等闲 白了少年头
=
Mobile:18611696624
QQ:79434564





Re: [External Sender] Flink sql nested elements

2020-06-09 Thread Leonard Xu
Hi, Ramna

Happy to hear you’ve resolved your problem, if you could post your SQL maybe 
this question can get quicker response.

Flink SQL is case sensitive default and there had an issue to track[1], I think 
it makes sense to add some specification in SQL section of docs.

Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-16175 



> 在 2020年6月9日,20:09,Ramana Uppala  写道:
> 
> It will be great if we can convert all query elements to lower case similar 
> to Hive.



Re: [ANNOUNCE] Apache Flink Stateful Functions 2.1.0 released

2020-06-09 Thread Congxian Qiu
@Gordon thanks a lot for the release and for being the release manager.
Also thanks to everyone who made this release possible!

Best,
Congxian


Oytun Tez  于2020年6月9日周二 下午7:08写道:

> Thank you, Gordon and everyone.
>
> On Tue, Jun 9, 2020 at 5:56 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink Stateful Functions 2.1.0.
>>
>> Stateful Functions is an API that simplifies building distributed
>> stateful applications.
>> It's based on functions with persistent state that can interact
>> dynamically with strong consistency guarantees.
>>
>> Please check out the release blog post for an overview of the release:
>> https://flink.apache.org/news/2020/06/09/release-statefun-2.1.0.html
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Maven artifacts for Stateful Functions can be found at:
>> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>>
>> Python SDK for Stateful Functions published to the PyPI index can be
>> found at:
>> https://pypi.org/project/apache-flink-statefun/
>>
>> Official Docker image for building Stateful Functions applications is
>> currently being published to Docker Hub. Progress for creating the Docker
>> Hub repository can be tracked at:
>> https://github.com/docker-library/official-images/pull/7749
>>
>> In the meantime, before the official Docker images are available,
>> Ververica has volunteered to make Stateful Function's images available
>> for the community via their public registry:
>> https://hub.docker.com/r/ververica/flink-statefun
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347861
>>
>> 
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Cheers,
>> Gordon
>>
> --
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>


??????flink????????????????

2020-06-09 Thread 1048262223
Hi


rich functionopenbroadcast??


Best,
Yichao Yang




----
??:"zjfpla...@hotmail.com"

flink如何传递全局变量

2020-06-09 Thread zjfpla...@hotmail.com
hi,
请问flink如何传递全局变量,静态类好像服务器端运行不行。
场景是:一开始flink程序起来时,读取配置文件中的配置项,此类配置项可能会在sink,source等等其他地方用到,算是整个程序的全局配置



zjfpla...@hotmail.com


???? StreamingFileSink????????????????HA??Hadoop????,??????yarn job

2020-06-09 Thread ???Z?w???w

 StreamingFileSinkHA??Hadoop,??yarn job


??

?? ??
=
Mobile??18611696624
QQ:79434564




Re: Stopping a job

2020-06-09 Thread Kostas Kloudas
Hi all,

Just for future reference, there is an ongoing discussion on the topic at
another thread found in [1].
So please post any relevant comments there :)

Cheers,
Kostas

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Age-old-stop-vs-cancel-debate-td35514.html#a35615

On Tue, Jun 9, 2020 at 7:36 AM M Singh  wrote:

> Thanks Kostas, Arvid, and Senthil for your help.
>
> On Monday, June 8, 2020, 12:47:56 PM EDT, Senthil Kumar <
> senthi...@vmware.com> wrote:
>
>
> I am just stating this for completeness.
>
>
>
> When a job is cancelled, Flink sends an Interrupt signal to the Thread
> running the Source.run method
>
>
>
> For some reason (unknown to me), this does not happen when a Stop command
> is issued.
>
>
>
> We ran into some minor issues because of said behavior.
>
>
>
> *From: *Kostas Kloudas 
> *Date: *Monday, June 8, 2020 at 2:35 AM
> *To: *Arvid Heise 
> *Cc: *M Singh , User-Flink 
> *Subject: *Re: Stopping a job
>
>
>
> What Arvid said is correct.
>
> The only thing I have to add is that "stop" allows also exactly-once sinks
> to push out their buffered data to their final destination (e.g.
> Filesystem). In other words, it takes into account side-effects, so it
> guarantees exactly-once end-to-end, assuming that you are
> using exactly-once sources and sinks. Cancel with savepoint on the other
> hand did not necessarily and committing side-effects is was following a
> "best-effort" approach.
>
>
>
> For more information you can check [1].
>
>
>
> Cheers,
>
> Kostas
>
>
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
> 
>
>
>
> On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise  wrote:
>
> It was before I joined the dev team, so the following are kind of
> speculative:
>
>
>
> The concept of stoppable functions never really took off as it was a bit
> of a clumsy approach. There is no fundamental difference between stopping
> and cancelling on (sub)task level. Indeed if you look in the twitter source
> of 1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume
> that this is probably true for all sources.
>
>
>
> So what is the difference between cancel and stop then? It's more the way
> on how you terminate the whole DAG. On cancelling, you cancel() on all
> tasks more or less simultaneously. If you want to stop, it's more a
> fine-grain cancel, where you stop first the sources and then let the tasks
> close themselves when all upstream tasks are done. Just before closing the
> tasks, you also take a snapshot. Thus, the difference should not be visible
> in user code but only in the Flink code itself (task/checkpoint coordinator)
>
>
>
> So for your question:
>
> 1. No, as on task level stop() and cancel() are the same thing on UDF
> level.
>
> 2. Yes, stop will be more graceful and creates a snapshot. [2]
>
> 3. Not that I am aware of. In the whole flink code base, there are no more
> (see javadoc). You could of course check if there are some in Bahir. But it
> shouldn't really matter. There is no huge difference between stopping and
> cancelling if you wait for a checkpoint to finish.
>
> 4. Okay you answered your second question ;) Yes cancel with savepoint =
> stop now to make it easier for new users.
>
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
> 
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
> 
>
>
>
> On Sun, Jun 7, 2020 at 1:04 AM M Singh  wrote:
>
>
>
> Hi Arvid:
>
>
>
> Thanks for the links.
>
>
>
> A few questions:
>
>
>
> 1. Is there any particular interface in 1.9+ that identifies the source as
> stoppable ?
>
> 2. Is there any distinction b/w stop and cancel  in 1.9+ ?
>
> 3. Is there any list of sources which are documented as stoppable 

Re: Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Kostas Kloudas
Hi Omkar,

For the first part of the question where you set the "drain" to false
and the state gets drained, this can be an issue on our side. Just to
clarify, no matter what is the value of the "drain", Flink always
takes a savepoint. Drain simply means that we also send MAX_WATERMARK
before taking the savepoint. Is this what you observe? I.e. that you
have an infinite input stream and even if you set drain to false, you
still see the MAX_WATERMARK?

For the second part of the question, the cancel-with-savepoint is a
deprecated command. But it is not removed for backwards compatibility.
So you can still have a cancel-with-savepoint in the way you
described. The difference between the deprecated cancel-with-savepoint
and the recommended stop-with-savepoint is that the
stop-with-savepoint guarantees that if you are using an exactly-once
sink, the side-effects are going to be committed to the sink before
the job exits. This was not the case for cancel-with-savepoint. For
more details, you can have a look at [1].

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Tue, Jun 9, 2020 at 4:40 AM Deshpande, Omkar
 wrote:
>
> Hello,
>
> When I try to stop the job with /jobs/:jobid/stop REST endpoint, the state 
> gets drained, even if I pass {"drain":false} in the body of the post request. 
> Is the value of drain flag true by default? Why is not getting used when I 
> pass {"drain":false}?
>
> And I can also stop a job using this endpoint /jobs/:jobid/savepoints with 
> {"cancel-job":"true"} in the body. In this case there the state is not 
> drained. What is the difference between these 2 endpoints? Is there a reason 
> to use one over the other?
>
> If I want to stop a job with savepoint but without draining the state which 
> endpoint should be used?
>
> Omkar


Re: [ANNOUNCE] Apache Flink Stateful Functions 2.1.0 released

2020-06-09 Thread Oytun Tez
Thank you, Gordon and everyone.

On Tue, Jun 9, 2020 at 5:56 AM Tzu-Li (Gordon) Tai 
wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Stateful Functions 2.1.0.
>
> Stateful Functions is an API that simplifies building distributed stateful
> applications.
> It's based on functions with persistent state that can interact
> dynamically with strong consistency guarantees.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2020/06/09/release-statefun-2.1.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Stateful Functions can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for Stateful Functions published to the PyPI index can be found
> at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker image for building Stateful Functions applications is
> currently being published to Docker Hub. Progress for creating the Docker
> Hub repository can be tracked at:
> https://github.com/docker-library/official-images/pull/7749
>
> In the meantime, before the official Docker images are available,
> Ververica has volunteered to make Stateful Function's images available for
> the community via their public registry:
> https://hub.docker.com/r/ververica/flink-statefun
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347861
>
> 
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Cheers,
> Gordon
>
-- 
 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


Re: Failed to deserialize Avro record

2020-06-09 Thread Arvid Heise
If data is coming from Kafka, the write schema is most likely stored in a
Schema Registry. If so, you absolutely need to use
ConfluentRegistryAvroSerializationSchema of the
*flink-avro-confluent-registry* package.

If you didn't opt for that most common architecture pattern, then you often
run into that the write schema and the supplied schema do not match. That
could also be the case here (but try the other serialization schema first).
If it still prevails, please elaborate how you manage the schema. It's also
helpful to see an example record and the schema if possible.

On Tue, Jun 9, 2020 at 11:10 AM Dawid Wysakowicz 
wrote:

> It's rather hard to help if we don't know the format in which the
> records are serialized. There is a significant difference if you use a
> schema registry or not. All schema registries known to me prepend the
> actual data with some kind of magic byte and an identifier of the
> schema. Therefore if we do not know to expect that we cannot properly
> deserialize the record.
>
> Nevertheless I would not say the problem has something to do with schema
> registry. If I understand you correctly some records can be
> deserialized. If they were produced with the schema registry type of
> serialization all would fail.
>
> What I can recommend is to try to log/identify a record that cannot be
> deserialized and check debug the AvroRowDeserializationSchema with it.
>
> Best,
>
> Dawid
>
> On 06/06/2020 16:27, Ramana Uppala wrote:
> > We are using AvroRowDeserializationSchema with Kafka Table source to
> deserialize the messages. Application failed with "Failed to deserialize
> Avro record." for different messages it seems.
> >
> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length
> is negative: -26
> >
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
> >   at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
> ~[avro-1.8.2.jar:1.8.2]
> >
> > We are not sure what the serialization mechanism producer is using to
> publish the messages at this time. But above errors are related to
> https://issues.apache.org/jira/browse/FLINK-16048 ?
> >
> > Any suggestions on fixing above issues ? we are using Flink 1.10
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: KeyedStream and keyedProcessFunction

2020-06-09 Thread 1048262223
Hi


+1. Because there is no need to generate an instance for each key, flink just 
maintain the key collection in one instance. Imagine what would happen if the 
number of keys were unlimited.



Best,
Yichao Yang




--Original--
From:"Tzu-Li (Gordon) Tai"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

????: ?????? ????flinksql between????

2020-06-09 Thread wangweigu...@stevegame.cn

  1.10 useBlinkPlanneruseOldPlanner
  
??
  Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalProject(num=[$0])
  LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner])
FlinkLogicalDataStreamScan(id=[1], fields=[num])
FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])

This exception indicates that the query uses an unsupported SQL feature.




 
 ??
?? 2020-06-09 17:41
 user-zh
?? ?? flinksql between
hi
1 flink1.9.0
2 oldplanner


Re: KeyedStream and keyedProcessFunction

2020-06-09 Thread Tzu-Li (Gordon) Tai
Hi,

Records with the same key will be processed by the same partition.
Note there isn't an instance of a keyed process function for each key.
There is a single instance per partition, and all keys that are distributed
to the same partition will get processed by the same keyed process function
instance.

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


??????Flink State ?????????? state ????????????

2020-06-09 Thread 1048262223
Hi


[1]


[1]https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/state/schema_evolution.html#pojo-%E7%B1%BB%E5%9E%8B


Best,
Yichao Yang




----
??:"wangl...@geekplus.com.cn"

Blocked requesting MemorySegment when Segments are available.

2020-06-09 Thread David Maddison
Hi,

I keep seeing the following situation where a task is blocked getting a
MemorySegment from the pool but the TaskManager is reporting that it has
lots of MemorySegments available.

I'm completely stumped as to how to debug or what to look at next so any
hints/help/advice would be greatly appreciated!

/David/

The situation is as follows (Flink 1.10.0):

I have two operations, the first one "enrich-events" is stuck forever
attempting to get a memory segment to send to downstream operator "Test
function":

"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0
tid=0x7f6424091800 nid=0x13b waiting on condition [0x7f644acf]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xd2206000> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)


All the operator tasks are running on the same TaskManager and the
TaskManager reports that it has 6,517 memory segments available, so it's
confusing why the task would be blocked getting a memory segment.

Memory Segments
Type  Count
Available  6,517
Total  6,553

Even more confusing is that the downstream task appears to be waiting for
data and therefore I would assume that the credit based flow control isn't
causing the back pressure.

"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x7f6424094000 nid=0x13c
waiting on condition [0x7f644abf]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc91de1d0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)


?????? ??????Flink State ?????????? state ????????????

2020-06-09 Thread 1048262223
Hi


TypeInformation a = 
TypeInformation.of(OrderState.class);??pojopojo??


Best,
Yichao Yang





----
??:"wangl...@geekplus.com.cn"

??????????????????

2020-06-09 Thread 1048262223
Hi


??jobmysql??

??jobmysql??job





Best,
Yichao Yang




----
??:"wangxiangyan"

????: ??????Flink State ?????????? state ????????????

2020-06-09 Thread wangl...@geekplus.com.cn

OrderState 
 get set ?? 

 flink ??OrderState??



wang...@geekplus.com.cn

 1048262223
?? 2020-06-09 18:11
 user-zh
?? ??Flink State ?? state 
Hi
 
 
flink??OrderStatepojo??savepoint??
 
Best,
Yichao Yang
 
 
 
 
----
??:"wangl...@geekplus.com.cn"

??????Flink State ?????????? state ????????????

2020-06-09 Thread 1048262223
Hi


flink??OrderStatepojo??savepoint??

Best,
Yichao Yang




----
??:"wangl...@geekplus.com.cn"

Flink State 增加字段后 state 还能识别吗?

2020-06-09 Thread wangl...@geekplus.com.cn

写了个简单的类会在 Flink State 中使用:

public class OrderState {
private Integer warehouseId;
private String orderNo;
private String ownerCode;
private Long inputDate;
private int orderType;
private int amount = 0;
private int status = 0;
.
}


现在程序要升级,这个类还要增加一个新的字段。从state 能正常恢复吗?
也就是 flink run -s   savepointdir   后能正常识别旧的代码保存的 state 吗?

谢谢,
王磊



wangl...@geekplus.com.cn


Re:延迟事件处理

2020-06-09 Thread wangxiangyan
1. 
指标统计展示是mysql,按照事件时间做窗口的统计,如果按照处理时间,需要找到数据所属的窗口实现外部系统的更新,但在前台页面可能获取不到最近几分钟的统计数据,此时事件时间也同样延迟,语义上不如事件事件解释性强一些


2. 白天运行的时候显示实时的数据,晚上去更正一整天的数据,资源消耗很大,不确定这种延迟的频率,应该也不会常出现吧


还是将延迟数据收集起来,另外启动一个流处理任务不断消费延迟数据,通过和mysql交互,将统计出的指标和之前窗口统计出的指标求和然后更新,这个逻辑似乎比较合理
--Original--
From: "1048262223"<1048262...@qq.com;
Date: Tue, Jun 9, 2020 05:40 PM
To: "user-zh"

[ANNOUNCE] Apache Flink Stateful Functions 2.1.0 released

2020-06-09 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 2.1.0.

Stateful Functions is an API that simplifies building distributed stateful
applications.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2020/06/09/release-statefun-2.1.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Docker image for building Stateful Functions applications is
currently being published to Docker Hub. Progress for creating the Docker
Hub repository can be tracked at:
https://github.com/docker-library/official-images/pull/7749

In the meantime, before the official Docker images are available,
Ververica has volunteered to make Stateful Function's images available for
the community via their public registry:
https://hub.docker.com/r/ververica/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347861

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


?????? ????flinksql between????

2020-06-09 Thread ??????
hi
1 flink1.9.0
2 oldplanner


sanity checking in ProcessWindowFunction.process shows that event timestamps are out of tumbling window time range

2020-06-09 Thread Yu Yang
Hi all,

We are writing an application that set TimeCharacteristic.EventTime as time
characteristic. When we implement the ProcessWindowFunction for a
TumblingWindow, we added code as below to check if the timestamp of events
is in the tumbling window time range. To our surprise, we found that the
process function reports processing events that are not in the tumbling
window time range. Any insights on how this happens?  We are using Flink
1.9.1.

Below is the timestamp assigner, stream dag snippet and process function
implementation:

Timestamp assigner:

FlinkKafkaConsumerBase source =
consumer.assignTimestampsAndWatermarks(

   new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(60)) {

 @Override

 public long extractTimestamp(Event element) {

   return element.getTimestamp();

 }

   });


The stream dag of our application:

env.addSource(source)

   .filter(new EventFilter(events))

.keyBy(new KeySelector() {

@Override

public EventStreamPartitionKey  getKey(Event value)

   throws Exception {

return new EventStreamPartitionKey(value.getHost());

   }

}).window(TumblingEventTimeWindows.of(Time.seconds(60))

   .process(new EventValidator())

   .addSink(kafkaProducer);

The implementation of  process window function EventValidator.process that
checks whether the event timestamp is in the tumbling window time range:

@Override
public void process(EventStreamPartitionKey key,
  Context context, Iterable elements,
Collector out) {

for(Event event : elements) {
if ( event.getTimestamp() >= context.window().getEnd() ||
   event.getTimestamp() < context.window().getStart() )

System.out.println("NOT in RANGE: " + context.window().getStart()

+ ", " + event.getTimestamp() + ", " + context.window().getEnd());
...

}
out.collect(res);
}



Thanks!


Regards,

-Yu


??????????????????

2020-06-09 Thread 1048262223
Hi


??
1.olap??olap
2. t - 1 
??


Best,
Yichao Yang




----
??:"wangxiangyan"

Re: pyflink数据查询

2020-06-09 Thread Jeff Zhang
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475



jack  于2020年6月9日周二 下午5:28写道:

> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>


-- 
Best Regards

Jeff Zhang


Re: pyflink数据查询

2020-06-09 Thread Jeff Zhang
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475



jack  于2020年6月9日周二 下午5:28写道:

> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>


-- 
Best Regards

Jeff Zhang


Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-06-09 Thread Aljoscha Krettek

Hi,

I agree with Robert that adding open/close support for partitioners 
would mean additional complexity in the code base. We're currently not 
thinking of supporting that.


Best,
Aljoscha

On 05.06.20 20:19, Arvid Heise wrote:

Hi Arnaud,

just to add up. The overhead of this additional map is negligible if you
enable object reuse [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html

On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger  wrote:


I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
correct me if needed:

Partitioners are not regular operators (like a map or window), thus they
are not included in the regular Task lifecycle methods (of open() / map()
etc. / close(), with the proper error handling, task cancellation
mechanisms etc.). The custom partition function is called somewhere close
to the network stack.
It would be quite a lot of effort (and added complexity to the codebase)
to allow for rich partitioners. Given that custom partitioners are a rarely
used feature, it would not be justified to spend a lot of time for this
(there's also a good workaround available)


On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud 
wrote:


Hello,



Yes, that would definitely do the trick, with an extra mapper after keyBy
to remove the tuple so that it stays seamless. It’s less hacky that what I
was thinking of, thanks!

However, is there any plan in a future release to have rich partitioners
? That would avoid adding  overhead and “intermediate” technical info in
the stream payload.

Best,

Arnaud



*De :* Robert Metzger 
*Envoyé :* vendredi 29 mai 2020 13:10
*À :* LINZ, Arnaud 
*Cc :* user 
*Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
close() methods ?



Hi Arnaud,



Maybe I don't fully understand the constraints, but what about

stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());


The map(new GetKuduPartitionMapper) will be a regular RichMapFunction
with open() and close() where you can handle the connection with Kudu's
partitioning service.

The map will output a Tuple2 (or something nicer :) ),
then Flink shuffles your data correctly, and the sinks will process the
data correctly partitioned.



I hope that this is what you were looking for!



Best,

Robert



On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud 
wrote:

Hello,



I would like to upgrade the performance of my Apache Kudu Sink by using
the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
Kudu partitions to lower the network shuffling.

For that, I would like to implement something like

stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
KuduSink(…)));

With KuduFLinkPartitioner a implementation of 
org.apache.flink.api.common.functions.Partitioner
that internally make use of the KuduPartitioner client tool of Kudu’s API.



However for that KuduPartioner to work, it needs to open – and close at
the end – a connection to the Kudu table – obviously something that can’t
be done for each line. But there is no “AbstractRichPartitioner” with
open() and close() method that I can use for that (the way I use it in the
sink for instance).



What is the best way to implement this ?

I thought of ThreadLocals that would be initialized during the first call
to *int* partition(K key, *int* numPartitions);  but I won’t be able to
close() things nicely as I won’t be notified on job termination.



I thought of putting those static ThreadLocals inside a “Identity Mapper”
that would be called just prior the partition with something like :

stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

with kudu connections initialized in the mapper open(), closed in the
mapper close(), and used  in the partitioner partition().

However It looks like an ugly hack breaking every coding principle, but
as long as the threads are reused between the mapper and the partitioner I
think that it should work.



Is there a better way to do this ?



Best regards,

Arnaud






--


L'intégrité de ce message n'étant pas assurée sur internet, la société
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
vous n'êtes pas destinataire de ce message, merci de le détruire et
d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The
company that sent this message cannot therefore be held liable for its
content nor attachments. Any unauthorized use or dissemination is
prohibited. If you are not the intended recipient of this message, then
please delete it and notify the sender.








Re: Re:延迟事件处理

2020-06-09 Thread wangweigu...@stevegame.cn

觉得对于下线产生的历史数据,用批处理应该更好一点,可以避免数据量过大造成的问题!


 
发件人: wangxiangyan
发送时间: 2020-06-09 17:26
收件人: user-zh
主题: Re:延迟事件处理
这是一个需要实时展示统计指标的系统,数据来源于检测器,检测器安装在客户那边,可能有下线的状态,或者数据延迟到达,不确定下线的时间,某个检测器下线之后在第二天上线会有一批昨天的数据,会发生延迟的数据处理


--Original--
From: "1048262223"<1048262...@qq.com;
Date: Tue, Jun 9, 2020 05:14 PM
To: "user-zh"

pyflink数据查询

2020-06-09 Thread jack
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。


flink能否实现这样的方式?
感谢

??????????????????????????????????

2020-06-09 Thread kcz
sorry??




----
??:"1048262223"<1048262...@qq.com;
:2020??6??9??(??) 5:07
??:"user-zh"

pyflink数据查询

2020-06-09 Thread jack
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作 
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。


flink能否实现这样的方式?
感谢

Re:延迟事件处理

2020-06-09 Thread wangxiangyan
这是一个需要实时展示统计指标的系统,数据来源于检测器,检测器安装在客户那边,可能有下线的状态,或者数据延迟到达,不确定下线的时间,某个检测器下线之后在第二天上线会有一批昨天的数据,会发生延迟的数据处理


--Original--
From: "1048262223"<1048262...@qq.com;
Date: Tue, Jun 9, 2020 05:14 PM
To: "user-zh"

Re: Flink SQL UDF 动态类型

2020-06-09 Thread Jingsong Li
Hi all,

业务上一般是可以避免动态类型的UDF的,如果有刚需,1.11已经支持了[1],文档还在路上,一个简单的例子根据第一个参数来推断返回类型:

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
   return TypeInference.newBuilder()
  .outputTypeStrategy(TypeStrategies.argument(0))
  .build();
}


[1]https://issues.apache.org/jira/browse/FLINK-15487

Best,
Jingsong Lee

On Tue, Jun 9, 2020 at 4:57 PM kcz <573693...@qq.com> wrote:

> 有道理呀,我直接map就好 。tks。
>
>
>
>
> --原始邮件--
> 发件人:"1048262223"<1048262...@qq.com;
> 发送时间:2020年6月9日(星期二) 下午4:51
> 收件人:"user-zh"
> 主题:回复: Flink SQL UDF 动态类型
>
>
>
> Hi
>
>
> 这个可以通过返回一个通用类型比如Map来实现。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"kcz"<573693...@qq.comgt;;
> 发送时间:nbsp;2020年6月9日(星期二) 下午4:49
> 收件人:nbsp;"user-zh"
> 主题:nbsp;回复: Flink SQL UDF 动态类型
>
>
>
> 动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。
>
>
>
>
> --amp;nbsp;原始邮件amp;nbsp;--
> 发件人:amp;nbsp;"Benchao Li" 发送时间:amp;nbsp;2020年6月9日(星期二) 下午2:47
> 收件人:amp;nbsp;"user-zh"
> 主题:amp;nbsp;Re: Flink SQL UDF 动态类型
>
>
>
> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
>
> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
>
> 1048262223 <1048262...@qq.comamp;gt; 于2020年6月9日周二 下午2:23写道:
>
> amp;gt; Hi
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
> amp;gt;
> amp;gt;
> amp;gt; 如果有理解不对之处,敬请指出。
> amp;gt;
> amp;gt;
> amp;gt; Best,
> amp;gt; Yichao Yang
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt;
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
> amp;gt; 发件人:amp;amp;nbsp;"forideal" amp;amp;gt;;
> amp;gt; 发送时间:amp;amp;nbsp;2020年6月9日(星期二) 中午1:33
> amp;gt; 收件人:amp;amp;nbsp;"user-zh" amp;amp;gt;;
> amp;gt;
> amp;gt; 主题:amp;amp;nbsp;Flink SQL UDF 动态类型
> amp;gt;
> amp;gt;
> amp;gt;
> amp;gt; 你好,我的朋友:
> amp;gt;
> amp;gt;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 我使用的是 Flink 1.10 Blink Planer。
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
> amp;gt;
> amp;gt;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 为什么我想要这个功能:
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
> amp;gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string
> 这样的方式,实现起来又非常多
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 场景2: 我的数据是一个 Json ,问题同上。
> amp;gt; amp;amp;nbsp;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
> amp;gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> amp;gt; @Override
> amp;gt; public void initialize(LogicalType[] sqlTypes, String[]
> paramNames) {
> amp;gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema
> 信息,拿到类型信息,这样就可以动态的设置类型
> amp;gt; }
> amp;gt; amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
> amp;gt; amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp; 这个case
> 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
> amp;gt; 这个类型不进行 cast 是无法直接使用的。
> amp;gt; public class TimestampTest extends ScalarFunction {
> amp;gt;
> amp;gt; public Object eval(long timestamp, String pattern, int num) {
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> Timestamp timestamp1 = new
> amp;gt; Timestamp(timestamp);
> amp;gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> if (num < 4) {
> amp;gt; //返回 STRING 类型
> amp;gt; return String.valueOf(timestamp);
> amp;gt; }
> amp;gt; if (num < 6) {
> amp;gt; //返回 BIGINT
> amp;gt; return timestamp - 100;
> amp;gt; }
> amp;gt; if (num < 8) {
> amp;gt; //返回 DOUBLE
> amp;gt; double ss = 0.9;
> amp;gt;
> amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;amp;amp;nbsp;
> return
> amp;gt; (double) timestamp + ss;
> amp;gt; }
> amp;gt; //返回 STRING
> amp;gt; return sdf.format(timestamp1);
> amp;gt; }
> amp;gt; }



-- 
Best, Jingsong Lee


Re: Re: flink1.10 on yarn 问题

2020-06-09 Thread wangweigu...@stevegame.cn

是的,yarn-session模式,如果没有提交任务到yarn-session的applictionid中,是不会分配任何的slot和内存!


 
发件人: Benchao Li
发送时间: 2020-06-08 18:26
收件人: user-zh
主题: Re: Re: flink1.10 on yarn 问题
yarn session模式应该是lazy的,你不提交任务,它不会真正的去申请container。
 
小屁孩 <932460...@qq.com> 于2020年6月8日周一 下午6:23写道:
 
>
> 这个问题已经解决 是我自己的cdh的namenode没有启动成功,我目前有个疑问
>
> 命令:./../bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d -s 2
> flink用这个命令申请的应该是 2个slot  为什么通过yarn 的ui界面 没有看到slot数
>
>
> 能不能帮我解答一下 谢谢
> -- 原始邮件 --
> *发件人:* "小屁孩"<932460...@qq.com>;
> *发送时间:* 2020年6月8日(星期一) 下午4:06
> *收件人:* "user-zh";
> *主题:* 回复: Re: flink1.10 on yarn 问题
>
> hello
> 命令 ./yarn-session.sh -n 8 -jm 1024 -tm 1024 -s 4 -nm FlinkOnYarnSession -d
> 版本 :flink1.10.0  CDH5.14
> 我在使用flink on yarn的yarn-session 模式时报错如下
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn session cluster
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:548)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
> Caused by: java.net.ConnectException: Call From master/192.168.1.20 to
> slave1:8020 failed on connection exception: java.net.ConnectException:
> Connection refused; For more details see:
> http://wiki.apache.org/hadoop/ConnectionRefused
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:791)
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:731)
> at org.apache.hadoop.ipc.Client.call(Client.java:1474)
> at org.apache.hadoop.ipc.Client.call(Client.java:1401)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy13.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
> at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:496)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:348)
> at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
> at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1907)
> at org.apache.flink.yarn.Utils.uploadLocalFileToRemote(Utils.java:172)
> at org.apache.flink.yarn.Utils.setupLocalResource(Utils.java:126)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.setupSingleLocalResource(YarnClusterDescriptor.java:1062)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.uploadAndRegisterFiles(YarnClusterDescriptor.java:1144)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:707)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:488)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:373)
> ... 7 more
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> at 

延迟事件处理

2020-06-09 Thread wangxiangyan
hi,有个问题请教大家

在使用中遇到的需求是,按分钟处理数据,数据源是不稳定的,可能会一段时间内下线,比如第二天前一天的数据大量涌入,可能的选择方案有
1.延迟数据处理:将延迟数据采取另外的逻辑处理与外部系统交互,但是允许延迟的状态存储是不是需要调节为一天时间
2.每天晚上定时使用批处理重新计算白天的数据去校正

应该使用哪种方式或者使用更好的方式去处理呢?

??????????????????

2020-06-09 Thread 1048262223
Hi



??flink


Best,
Yichao Yang




----
??:"wangxiangyan"

Re: Failed to deserialize Avro record

2020-06-09 Thread Dawid Wysakowicz
It's rather hard to help if we don't know the format in which the
records are serialized. There is a significant difference if you use a
schema registry or not. All schema registries known to me prepend the
actual data with some kind of magic byte and an identifier of the
schema. Therefore if we do not know to expect that we cannot properly
deserialize the record.

Nevertheless I would not say the problem has something to do with schema
registry. If I understand you correctly some records can be
deserialized. If they were produced with the schema registry type of
serialization all would fail.

What I can recommend is to try to log/identify a record that cannot be
deserialized and check debug the AvroRowDeserializationSchema with it.

Best,

Dawid

On 06/06/2020 16:27, Ramana Uppala wrote:
> We are using AvroRowDeserializationSchema with Kafka Table source to 
> deserialize the messages. Application failed with "Failed to deserialize Avro 
> record." for different messages it seems.
>
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is 
> negative: -26
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
>   at 
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) 
> ~[avro-1.8.2.jar:1.8.2]
>
> We are not sure what the serialization mechanism producer is using to publish 
> the messages at this time. But above errors are related to 
> https://issues.apache.org/jira/browse/FLINK-16048 ?
>
> Any suggestions on fixing above issues ? we are using Flink 1.10



signature.asc
Description: OpenPGP digital signature


延迟事件处理

2020-06-09 Thread wangxiangyan
hi
在使用中遇到的需求是,按分钟处理数据,数据源是不稳定的,可能会一段时间内下线,比如第二天前一天的数据大量涌入,可能的选择方案有
1.延迟数据处理:将延迟数据采取另外的逻辑处理与外部系统交互,但是允许延迟的状态存储是不是需要调节为一天时间
2.每天晚上定时使用批处理重新计算白天的数据去校正

应该使用哪种方式或者使用更好的方式去处理呢?

flink延迟数据处理

2020-06-09 Thread wangxiangyan
hi
在使用中遇到的需求是,按分钟处理数据,数据源是不稳定的,可能会一段时间内下线,比如第二天前一天的数据大量涌入,可能的选择方案有
1.延迟数据处理:将延迟数据采取另外的逻辑处理与外部系统交互,但是允许延迟的状态存储是不是需要调节为一天时间
2.每天晚上定时使用批处理重新计算白天的数据去校正

应该使用哪种方式或者使用更好的方式去处理呢?

??????????????????????????????????

2020-06-09 Thread 1048262223
Hi


??


Best
Yichao Yang




----
??:"kcz"<573693...@qq.com;
:2020??6??9??(??) 5:06
??:"user-zh"

????????????????????????????

2020-06-09 Thread kcz
join??open
??

Re: Run command after Batch is finished

2020-06-09 Thread Mark Davis
Hi Chesnay,

That is an interesting proposal, thank you!
I was doing something similar with the OutputFormat#close() method respecting 
the Format's parallelism. Using FinalizeOnMaster will make things easier.
But the problem is that several OutputFormats must be synchronized externally - 
every output must check whether other outputs finished already... Quite 
cumbersome.
Also there is a problem with exceptions - the OutputFormats can be never open 
and never closed.

  Mark

‐‐‐ Original Message ‐‐‐
On Monday, June 8, 2020 5:50 PM, Chesnay Schepler  wrote:

> This goes in the right direction; have a look at 
> org.apache.flink.api.common.io.FinalizeOnMaster, which an OutputFormat can 
> implement to run something on the Master after all subtasks have been closed.
>
> On 08/06/2020 17:25, Andrey Zagrebin wrote:
>
>> Hi Mark,
>>
>> I do not know how you output the results in your pipeline.
>> If you use DataSet#output(OutputFormat outputFormat), you could try to 
>> extend the format with a custom close method which should be called once the 
>> task of the sink batch operator is done in the task manager.
>> I also cc Aljoscha, maybe, he has more ideas.
>>
>> Best,
>> Andrey
>>
>> On Sun, Jun 7, 2020 at 1:35 PM Mark Davis  wrote:
>>
>>> Hi Jeff,
>>>
>>> Unfortunately this is not good enough for me.
>>> My clients are very volatile, they start a batch and can go away any moment 
>>> without waiting for it to finish. Think of an elastic web application or an 
>>> AWS Lambda.
>>>
>>> I hopped to find something what could be deployed to the cluster together 
>>> with the batch code. Maybe a hook to a job manager or similar. I do not 
>>> plan to run anything heavy there, just some formal cleanups.
>>> Is there something like this?
>>>
>>> Thank you!
>>>
>>>   Mark
>>>
>>> ‐‐‐ Original Message ‐‐‐
>>> On Saturday, June 6, 2020 4:29 PM, Jeff Zhang  wrote:
>>>
 It would run in the client side where ExecutionEnvironment is created.

 Mark Davis  于2020年6月6日周六 下午8:14写道:

> Hi Jeff,
>
> Thank you very much! That is exactly what I need.
>
> Where the listener code will run in the cluster deployment(YARN, k8s)?
> Will it be sent over the network?
>
> Thank you!
>
>   Mark
>
> ‐‐‐ Original Message ‐‐‐
> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>
>> You can try JobListener which you can register to ExecutionEnvironment.
>>
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>>
>> Mark Davis  于2020年6月6日周六 上午12:00写道:
>>
>>> Hi there,
>>>
>>> I am running a Batch job with several outputs.
>>> Is there a way to run some code(e.g. release a distributed lock) after 
>>> all outputs are finished?
>>>
>>> Currently I do this in a try-finally block around 
>>> ExecutionEnvironment.execute() call, but I have to switch to the 
>>> detached execution mode - in this mode the finally block is never run.
>>>
>>> Thank you!
>>>
>>>   Mark
>>
>> --
>> Best Regards
>>
>> Jeff Zhang

 --
 Best Regards

 Jeff Zhang

?????? Flink SQL UDF ????????

2020-06-09 Thread kcz
map ??tks??




----
??:"1048262223"<1048262...@qq.com;
:2020??6??9??(??) 4:51
??:"user-zh"

?????? Flink SQL UDF ????????

2020-06-09 Thread 1048262223
Hi


Map


Best,
Yichao Yang




----
??:"kcz"<573693...@qq.com;
:2020??6??9??(??) 4:49
??:"user-zh"

?????? Flink SQL UDF ????????

2020-06-09 Thread kcz
udfudf




----
??:"Benchao Li"

Fwd: Understading Flink statefun deployment

2020-06-09 Thread Francesco Guardiani
Hi everybody,
I'm quite new to Flink and Flink Statefun and I'm trying to understand the
deployment techniques on k8s.
I wish to understand if it's feasible to deploy a statefun project
separating the different functions on separate deployments (in order to
have some functions as remote and some as embedded) all connected to the
same master. The idea is that I can scale the deployments independently
using the Kubernetes HPA and these instances cooperate automatically using
the same master. For example, given a flow like kafka -> fn a -> fn b ->
kafka:

* Remote function A (plus ingress) in deployment fn-a, where the function
process is deployed as another container in the same pod
* embedded function B (plus egress) in deployment fn-b
* master deployment in flink-master

Does that make sense at all in Flink architecture? If it's feasible, do you
have any example?

FG

-- 
Francesco Guardiani
Website: https://slinkydeveloper.com/
Twitter: https://twitter.com/SlinkyGuardiani

Github: https://github.com/slinkydeveloper


Re: 关于flinksql between问题

2020-06-09 Thread Benchao Li
方便补充一下以下信息么?
1. 你使用的Flink的版本?
2. 使用的planner,是blink planner还是old planner?
3. 用的是streaming mode还是batch mode?
4. 具体的报错信息是什么?

小屁孩 <932460...@qq.com> 于2020年6月9日周二 下午4:26写道:

> hi,我在flinksql中使用 select * from a join b on a.ip  b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用


Re: Internal state and external stores conditional usage advice sought (dynamodb asyncIO)

2020-06-09 Thread Andrey Zagrebin
Hi Orionemail,

There is no simple state access in asyncIO operator. I think this would
require a custom caching solution.
Maybe, other community users solved this problem in some other way.

Best,
Andrey


On Mon, Jun 8, 2020 at 5:33 PM orionemail  wrote:

> Hi,
>
> Following on from an earlier email my approach has changed but I am still
> unsure how to best acheive my goal.
>
> I have records coming through a kinesis stream into flink:
>
> { id: 
>   var1: 
>   ...
> }
>
> 'id' needs to be replaced with a value from a DB store, or if not present
> in the DB generate in flink a new ID, cache the value and then store back
> in the db.  This is essentially a basic ID mapping service.
>
> Currently for each record I use asyncIO to get a value from Dynamo or
> generate and write the new value back to the DB.
>
> This is unnecissary as I should be able to cache this value after the
> first time it is seen/generated.
>
> What I want to do is cache the value from the DB after first fetch in some
> form of local state  but also update the DB.
>
> My confusion is over which of the API's or what I should use to do this?
>
> Currently my code looks something like:
>
> source = KeyedStream getKinesisSource().keyBy(pojo - pojo.id
> )
>
> SingleOutputStreamOperator ps = AsycDataStream.unorderedWait(source,
> new DynoProcessingCode(),
> ..
> ..).process(new processFunction())
>
> class processFunction extends ProcessFunction {
> ..
> }
>
> If I insert a KeyedProcessFunction after the keyby and before the asyncIO
> I could abort the Async process if the ID has already been read from the
> cache, but if I do need to fetch from the db, how do I store that in the
> keyed cache in the Async IO process?  It seems that maybe that is not
> possible and I should use Operator State?
>
> Any help appreciated.
>
> Thanks,
>
> O
>
>
> Sent with ProtonMail  Secure Email.
>
>


????flinksql between????

2020-06-09 Thread ??????
hi??flinksql?? select * from a join b on a.ip 

Re: Age old stop vs cancel debate

2020-06-09 Thread Kostas Kloudas
Hi Senthil,

>From a quick look at the code, it seems that the cancel() of your
source function should be called, and the thread that it is running on
should be interrupted.

In order to pin down the problem and help us see if this is an actual
bug, could you please:
1) enable debug logging and see if you can spot some lines like this:

"Starting checkpoint (-ID) SYNC_SAVEPOINT on task X" or sth
similar with synchronous savepoint in it

and any other message afterwards with -ID in it to see if the
savepoint is completed successfully.

2) could you see if this behavior persists in the FLINK-1.10?

Thanks,
Kostas

On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar  wrote:
>
> Robert,
>
>
>
> Thank you once again! We are currently doing the “short” Thread.sleep() 
> approach. Seems to be working fine.
>
>
>
> Cheers
>
> Kumar
>
>
>
> From: Robert Metzger 
> Date: Tuesday, June 2, 2020 at 2:40 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
> this is more a Java question than a Flink question now :) If it is easily 
> possible from your code, then I would regularly check the isRunning flag (by 
> having short Thread.sleeps()) to have a proper cancellation behavior.
>
> If this makes your code very complicated, then you could work with manually 
> interrupting your worker thread. I would only use this method if you are sure 
> your code (and the libraries you are using) are properly handling interrupts.
>
> Sorry that I can not give you a more actionable response. It depends a lot on 
> the structure of your code and the libraries you are calling into.
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Fri, May 29, 2020 at 10:48 PM Senthil Kumar  wrote:
>
> Hi Robert,
>
>
>
> Would appreciate more insights please.
>
>
>
> What we are noticing: When the flink job is issued a stop command, the 
> Thread.sleep is not receiving the InterruptedException
>
>
>
> It certainly receives the exception when the flink job is issued a cancel 
> command.
>
>
>
> In both cases (cancel and stop) the cancel() method is getting called (to set 
> the isRunning variable to false)
>
>
>
> However, given that the thread does not get interrupted in stop, we are not 
> in a position to check the isRunning variable.
>
>
>
>
>
> For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal 
> interval which is in hours).
>
> Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>
>
>
> Another approach would be to save the currentThread (Thread.currentThread()) 
> before doing a Thread.sleep())
>
> and manually calling Thread.interrupt() from the cancel function.
>
>
>
> What is your recommendation?
>
>
>
> Cheers
>
> Kumar
>
>
>
>
>
> From: Robert Metzger 
> Date: Friday, May 29, 2020 at 4:38 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
>
> They way you've implemented your custom source sounds like the right way: 
> Having a "running" flag checked by the run() method and changing it in 
> cancel().
>
> Also, it is good that you are properly handling the interrupt set by Flink 
> (some people ignore InterruptedExceptions, which make it difficult (basically 
> impossible) for Flink to stop the job)
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Wed, May 27, 2020 at 7:38 PM Senthil Kumar  wrote:
>
> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false via 
> the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps 
> (Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
> the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the 
> isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not get 
> interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which 
> seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink job 
> receives a stop command?
>
>
>
> My understanding was that there was a Stoppable interface (which got removed 
> in 1.9.0)
>
>
>
> Would appreciate any insights.
>
>
>
> Cheers
>
> Kumar


Re: [External Sender] Re: Flink sql nested elements

2020-06-09 Thread Dawid Wysakowicz
Hi Ramana,

Could you help us with a way to reproduce the behaviour? I could not
reproduce it locally. The code below works for me just fine:

|StreamExecutionEnvironment exec =
StreamExecutionEnvironment.getExecutionEnvironment();||
||StreamTableEnvironment tEnv = StreamTableEnvironment.create(||
||        exec,||
||       
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());||
||tEnv.registerTableSource(||
||        "T",||
||        new StreamTableSource() {||
||            @Override||
||            public TableSchema getTableSchema() {||
||                return TableSchema.builder()||
||                        .field("f3",
DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING(||
||                        .build();||
||            }||
||            @Override||
||            public DataStream
getDataStream(StreamExecutionEnvironment execEnv) {||
||                return execEnv.fromCollection(||
||                        Arrays.asList(Row.of(Row.of("ABCDE")))||
||                );||
||            }||
||            @Override||
||            public DataType getProducedDataType() {||
||                return DataTypes.ROW(||
||                        DataTypes.FIELD(||
||                                "f3",||
||                               
DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING()))||
||                        )||
||                );||
||            }||
||        });||
||Table table = tEnv.sqlQuery("SELECT f3.nested FROM T");||
||DataStream result = tEnv.toAppendStream(||
||        table,||
||        Types.ROW(Types.STRING()));||
||result.print();||
||exec.execute();|

Best,

Dawid

On 05/06/2020 13:59, Ramana Uppala wrote:
> Hi Leonard,
>
> We are using Flink 1.10 version and I can not share the complete
> schema but it looks like below in Hive Catalog, 
>
> flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
> `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
>
> Based on the stack trace, sqlUpdate API validates the sql statement
> and throwing the above error.  Do we need to configure any Calcite
> configuration to support nested types ?
>
> Thanks,
> Ramana.
>
> On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu  > wrote:
>
> Hi,Ramana
>
> For nested data type, Flink use dot (eg a.b.c) to visit nested
> elements. Your SQL syntax looks right, which Flink version are you
> using? And could you post your Avro Schema file and DDL ?
>
> Best,
> Leonard Xu
>
> > 在 2020年6月5日,03:34,Ramana Uppala  > 写道:
> >
> > We have Avro schema that contains nested structure and when
> querying using Flink SQL, we are getting below error.
> >
> > Exception in thread "main" java.lang.AssertionError
> >       at
> org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> >       at
> org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> >       at
> org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> >       at
> 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> >       at
> 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> >       at
> 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> >       at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> >
> > Example Schema:
> > ROW<`col1` VARCHAR(2147483647), `postalAddress`
> ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> >
> > Example SQL:
> > insert into CSVSink
> > select
> > col1,
> > postalAddress.addressLine1 as address
> > from myStream
> >
> > In Flink SQL, How to select nested elements ?
> >
>
> 
>
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The
> information transmitted herewith is intended only for use by the
> individual or entity to which it is addressed. If the reader of this
> message is not the intended recipient, you are hereby notified that
> any review, retransmission, dissemination, distribution, copying or
> other use of, or taking of any action in reliance upon this
> information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the
> material from your computer.
>
>


signature.asc
Description: OpenPGP digital signature


Re: [External Sender] Re: Avro Arrat type validation error

2020-06-09 Thread Dawid Wysakowicz
To make sure we are on the same page.

The end goal is to have the

CatalogTable#getTableSchema/TableSource#getTableSchema return a schema
that is compatible with TableSource#getProducedDataType.

If you want to use the new types, you should not implement the
TableSource#getReturnType. Moreover you should also not use any Flink
utilities that convert from TypeInformation to DataTypes as those
produce legacy types.

I am aware there is a lot of corner cases and we worked hard to improve
the situation with the new sources and sinks interfaces.

Below I add an example how you could pass different array types:

|StreamExecutionEnvironment exec =
StreamExecutionEnvironment.getExecutionEnvironment();||
||StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);||
||tEnv.registerTableSource(||
||    "T",||
||    new StreamTableSource() {||
||        @Override||
||        public TableSchema getTableSchema() {||
||            return TableSchema.builder()||
||                .field("f0",
DataTypes.ARRAY(DataTypes.BIGINT().notNull()))||
||                .field("f1", DataTypes.ARRAY(DataTypes.BIGINT()))||
||                .field("f2", DataTypes.ARRAY(DataTypes.STRING()))||
||                .build();||
||        }|||

|        @Override
        public DataStream getDataStream(StreamExecutionEnvironment
execEnv) {
            return execEnv.fromCollection(
                Arrays.asList(Row.of(new long[]{1}, new Long[]{new
Long(1)}, new String[]{"ABCDE"})),
                // this is necessary for STRING array, cause otherwise
DataStream produces a different
                // TypeInformation than the planner expects
                (TypeInformation)
TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())
            );
        }||
||
||        @Override||
||        public DataType getProducedDataType() {||
||            return DataTypes.ROW(||
||                DataTypes.FIELD(||
||                    "f0",||
||                   
DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class))||
||                        .bridgedTo(long[].class)),||
||                DataTypes.FIELD(||
||                    "f1",||
||                    DataTypes.ARRAY(DataTypes.BIGINT())),||
||                DataTypes.FIELD(||
||                    "f2",||
||                    DataTypes.ARRAY(DataTypes.STRING()))||
||                );||
||        }||
||    });||
||
||Table table = tEnv.sqlQuery("SELECT f0, f1, f2 FROM T");||
||DataStream result = tEnv.toAppendStream(||
||    table,||
||    Types.ROW(||
||        Types.PRIMITIVE_ARRAY(Types.LONG),||
||        ObjectArrayTypeInfo.getInfoFor(Types.LONG),||
||        ObjectArrayTypeInfo.getInfoFor(Types.STRING)));||
||result.print();||
||env.execute();|

Hope this will help and that it will be much easier in Flink 1.11

Best,

Dawid

On 05/06/2020 13:33, Ramana Uppala wrote:
> Hi Dawid,
>
> We are using a custom connector that is very similar to Flink Kafka
> Connector and  instantiating TableSchema using a custom class which
> maps Avro types to Flink's DataTypes using TableSchema.Builder.
>
> For Array type, we have below mapping:
>
>  case ARRAY:
>                 return
> DataTypes.ARRAY(toFlinkType(schema.getElementType()));
>
>
> We are using Hive Catalog and creating tables
> using CatalogTableImpl with TableSchema.
>
> As you mentioned, if we create TableSchema with legacy types, our
> connectors works without any issues. But, we want to use the new Flink
> DataTypes API but having issues.
>
> Also, one more observation is if we use legacy types in TableSource
> creation, application not working using Blink Planner. We are getting
> the same error physical type not matching.
>
> Looking forward to the 1.11 changes.
>
>
> On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Ramana,
>
> What connector do you use or how do you instantiate the TableSource?
> Also which catalog do you use and how do you register your table
> in that
> catalog?
>
> The problem is that conversion from TypeInformation to DataType
> produces
> legacy types (because they cannot be mapped exactyl 1-1 to the new
> types).
>
> If you can change the code of the TableSource you can return in the
> TableSource#getProducedType the tableSchema.toRowDataType, where the
> tableSchema is the schema coming from catalog. Or you can make
> sure that
> the catalog table produces the legacy type:
>
> TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));
>
> In 1.11 we will introduce new sources and formats already working
> entirely with the new type system
> (AvroRowDataDeserializationSchema and
> KafkaDynamicTable).
>
> Hope this helps a bit.
>
> Best,
>
> Dawid
>
> On 04/06/2020 13:43, Ramana Uppala wrote:
> > Hi,
> > Avro schema contains Array type and we created
> TableSchema out of the AvroSchema and created a table in catalog.
> In 

  1   2   >