flink sql Temporal table join failed

2020-06-11 Thread Zhou Zach
SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default 
configuration: logging only errors to the console.

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 




FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], 
fields=[time, sum_age])

+- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])

   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])

  :- FlinkLogicalCalc(select=[uid, time])

  :  +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, 
clickCount, time)]]], fields=[uid, phoneType, clickCount, time])

  +- FlinkLogicalSnapshot(period=[$cor0.time])

 +- FlinkLogicalCalc(select=[uid, age])

+- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, age, 
created_time)]]], fields=[uid, sex, age, created_time])




Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left 
table's proctime field, doesn't support 'PROCTIME()'

Please check the documentation for the set of currently supported SQL features.

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)

at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)

at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)

at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

at 
org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)

at 
org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)

Caused by: org.apache.flink.table.api.TableException: Temporal table join 
currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, 
doesn't support 'PROCTIME()'

at 
org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67)

at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:147)

at 
org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnCalcTableScanRule.matches(CommonLookupJoinRule.scala:161)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370)

at 

Re: Reading files from multiple subdirectories

2020-06-11 Thread Yun Gao
Hi Lorenzo,

Read from a previouse thread [1] and the source code, I think you may set 
inputFormat.setNestedFileEnumeration(true) to also scan the nested files.

Best,
Yun

[1] 
https://lists.apache.org/thread.html/86a23b4c44d92c3adeb9ff4a708365fe4099796fb32deb6319e0e17f%40%3Cuser.flink.apache.org%3E



--
Sender:Lorenzo Nicora
Date:2020/06/11 21:31:20
Recipient:user
Theme:Reading files from multiple subdirectories

Hi,

related to the same case I am discussing in another thread, but not related to 
AVRO this time :) 

I need to ingest files a S3 Sink Kafka Connector periodically adds to an S3 
bucket.
Files are bucketed by date time as it often happens.

Is there any way, using Flink only, to monitor a base-path and detect new files 
in any subdirectories? 
Or I need to use something external to move new files in a single directory?

I am currently using 
env.readFile(inputFormat, path, PROCESS_CONTINUOUSLY, 6)
with AvroInputFormat, but it seems it can only monitor a single directory


Cheers
Lorenzo 


Re:Flink/SparkStreaming 性能测试(吞吐/延时)

2020-06-11 Thread Michael Ran
挺好的!1.记录进出flink的时间,如果时间生成规则一致,那么flink、spark 
消耗至少是一致的,具有可对比性吧。性能影响会有的,不大。2.是不是可以尝试  
数据字节大小,算子,窗口,资源,场景(简单ETL,维表JOIN,双流join,触发背压,复杂SQL 计算)   SQL 有标准的测试
在 2020-06-12 10:49:26,"Zhonghan Tang" <13122260...@163.com> 写道:
>Hi,
>近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少,  只有17年美团/15年yahoo 
>做了一个类似的分析. 问题如下:
>1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能?
>2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能?
>
>
>美团链接:
>https://tech.meituan.com/2017/11/17/flink-benchmark.html?spm=a2c6h.13066369.0.0.5e3c1455V4UrXH
>yahoo:
>https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>
>
>| |
>Zhonghan Tang
>|
>|
>13122260...@163.com
>|
>签名由网易邮箱大师定制
>


Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-11 Thread Xintong Song
Hi Vijay,

The memory configurations in Flink 1.9 and previous versions are indeed
complicated and confusing. That is why we made significant changes to it in
Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the
upcoming Flink 1.11 which is very likely to be released in this month.

Regarding your questions,

   - "Physical Memory" displayed on the web ui stands for the total memory
   on your machine. This information is retrieved from your OS. It is not
   related to the network memory calculation. It is displayed mainly for
   historical reasons.
   - The error message means that you have about 26.8 GB network memory
   (877118 * 32768 bytes), and your job is trying to use more.
   - The "total memory" referred in network memory calculation is:
  - jvm-heap + network, if managed memory is configured on-heap
  (default)
 - According to your screenshot, the managed memory
 on-heap/off-heap configuration is not touched, so this should
be your case.
  - jvm-heap + managed + network, if managed memory is configured
  off-heap
   - The network memory size is actually derived reversely. Flink reads the
   max heap size from JVM (and the managed memory size from configuration if
   it is configured off-heap), and derives the network memory size with the
   following equation.
  - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap /
  (1-networkFraction) * networkFraction))
  - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) *
  0.48)) = 26.8GB

One thing I don't understand is, why do you only have 29GB heap size when
"taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The
JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total"
to represent "taskmanager.heap.size" for short. Also omitted the
calculations when managed memory is configured off-heap.

   - Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 -
   0.48) = 53 GB
   - On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) *
   (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) =
   40.6GB

Have you specified a custom "-Xmx" parameter?

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan 
wrote:

> Hi,
> Get this error:
> java.io.IOException: Insufficient number of network buffers: required 2,
> but only 0 available. The total number of network buffers is currently set
> to 877118 of 32768 bytes each. You can increase this number by setting the
> configuration keys 'taskmanager.network.memory.fraction',
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A
> typical reason for `AskTimeoutException` is that the recipient actor didn't
> send a reply.
>
>
> Followed docs here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html
>
> network = Min(max, Max(min, fraction x total)  //what does Total mean -
> The max JVM heap is used to derive the total memory for the calculation of
> network buffers. - can I see it in the Flink Dashboard ??? 117GB here ?
> = Min(50G, Max(500mb, Max(0.48 * 117G))  ) = MIn(50G, 56.16G)= 50G
> 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ?
> Used this in flink-conf.yaml:
> taskmanager.numberOfTaskSlots: 10
> rest.server.max-content-length: 314572800
> taskmanager.network.memory.fraction: 0.45
> taskmanager.network.memory.max: 50gb
> taskmanager.network.memory.min: 500mb
> akka.ask.timeout: 240s
> cluster.evenly-spread-out-slots: true
> akka.tcp.timeout: 240s
> taskmanager.network.request-backoff.initial: 5000
> taskmanager.network.request-backoff.max: 3
> web.timeout:100
> web.refresh-interval:6000
>
> Saw some old calc about buffers
> (slots/Tm * slots/TM) * #TMs * 4
> =10 * 10 * 47 * 4 = 18,800 buffers.
>
> What am I missing in the network buffer calc ??
>
> TIA,
>
>
>


Re: flink TableEnvironment can not call getTableEnvironment api

2020-06-11 Thread Leonard Xu
是的,我代码贴错了, 你参考下下面的链接

> 在 2020年6月12日,11:55,Zhou Zach  写道:
> 
> 感谢回复,不过,根据文档
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
> 只能用Blink planner吧
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-12 11:49:08,"Leonard Xu"  写道:
>> Hi,
>> 这个文档应该是1.9时更新漏掉了,我建个issue修复下。现在可以这样使用[1]:
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>>   .useOldPlanner()
>>   .inStreamingMode()
>>   .build();
>> StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, 
>> envSettings);
>> 
>> 
>> Best,
>> Leonard Xu
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
>>  
>> 
>> 
>>> 在 2020年6月12日,11:39,Zhou Zach  写道:
>>> 
>>> 
>>> 
>>> flink version 1.10.0
>>> 
>>> 
>>> 根据文档
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#defining-temporal-table
>>> 想要Defining Temporal Table,但是没有发现getTableEnvironment。。
>>> 
>>> 
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tEnv = TableEnvironment.getTableEnvironment(env)
>> 



Re:Re: flink TableEnvironment can not call getTableEnvironment api

2020-06-11 Thread Zhou Zach
感谢回复,不过,根据文档
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
只能用Blink planner吧

















在 2020-06-12 11:49:08,"Leonard Xu"  写道:
>Hi,
>这个文档应该是1.9时更新漏掉了,我建个issue修复下。现在可以这样使用[1]:
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>.useOldPlanner()
>.inStreamingMode()
>.build();
>StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, 
>envSettings);
>
>
>Best,
>Leonard Xu
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
> 
>
>
>> 在 2020年6月12日,11:39,Zhou Zach  写道:
>> 
>> 
>> 
>> flink version 1.10.0
>> 
>> 
>> 根据文档
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#defining-temporal-table
>> 想要Defining Temporal Table,但是没有发现getTableEnvironment。。
>> 
>> 
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>


Re: flink TableEnvironment can not call getTableEnvironment api

2020-06-11 Thread Leonard Xu
Hi,
这个文档应该是1.9时更新漏掉了,我建个issue修复下。现在可以这样使用[1]:
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, 
envSettings);


Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
 


> 在 2020年6月12日,11:39,Zhou Zach  写道:
> 
> 
> 
> flink version 1.10.0
> 
> 
> 根据文档
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#defining-temporal-table
> 想要Defining Temporal Table,但是没有发现getTableEnvironment。。
> 
> 
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)



回复:Flink kerberos环境下多个keytab认证问题

2020-06-11 Thread 张宇昂
可以试试用户代理吧,不用把所有keytab都加上



---原始邮件---
发件人: "zhangjunjie1...@163.com"

flink TableEnvironment can not call getTableEnvironment api

2020-06-11 Thread Zhou Zach


flink version 1.10.0


根据文档
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html#defining-temporal-table
想要Defining Temporal Table,但是没有发现getTableEnvironment。。


val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

Flink kerberos环境下多个keytab认证问题

2020-06-11 Thread zhangjunjie1...@163.com
Flink1.9环境下,搭建Flink on yarn平台,用户之间实现租户/资源隔离,在kerberos环境下运行Flink 
perjob模式,需要在Flink-conf.yaml中添加:
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/flink/p_zhangjunjie.keytab
security.kerberos.login.principal: p_zhangjun...@local.com 
但是如果多个用户使用Flink环境资源,比如说除了p_zhangjunjie,还是p_wanglin,然后我在Flink-conf.yaml中添加:
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/flink/p_zhangjunjie.keytab
security.kerberos.login.principal: p_zhangjun...@local.com 

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/flink/p_wanglin.keytab
security.kerberos.login.principal: p_wang...@local.com 
但是只有最下面的一个(比如p_wanglin)生效。使用p_zhangjunjie就报错:那如何实现多个keytab用户同时生效呢?

谢谢!麻烦大家帮忙解决一下了。哪怕提供个思路都可以。





zhangjunjie1...@163.com


Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-11 Thread Xintong Song
Hi Clay,

Could you verify the "taskmanager.sh" used is the same script shipped with
Flink-1.10.1? Or a custom script is used? Also, does the jar file
"bash-java-utils.jar" exist in your Flink bin directory?

In Flink 1.10, the memory configuration for a TaskManager works as follows.

   - "taskmanager.sh" executes "bash-java-utils.jar" for the memory
   calculations
   - "bash-java-utils.jar" will read your "flink-conf.yaml" and all the
   "-D" arguments, and calculate memory sizes accordingly
   - "bash-java-utils.jar" will then return the memory calculation results
   as two strings, for JVM parameter ("-Xmx", "-Xms", etc.) and dynamic
   configurations ("-D") respectively
   - At this step, all the detailed memory sizes should be determined
  - That means, even for memory sizes not configured by you, there
  should be an exact value generated in the returned dynamic configuration
  - That also means, for memory components configured in ranges (e.g.,
  network memory configured through a pair of [min, max]), a deterministic
  value should be decided and both min/max configuration options should
  already been overwrite to that value
   - "taskmanager.sh" starts the task manager JVM process with the returned
   JVM parameters, and passes the dynamic configurations as arguments into the
   task manager process. These dynamic configurations will be read by Flink
   task manager so that memory will be managed accordingly.

Flink task manager expects all the memory configurations are already set
(thus network min/max should have the same value) before it's started. In
your case, it seems such configurations are missing. Same for the cpu cores.

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 12:58 AM Clay Teeter  wrote:

> Hi flink fans,
>
> I'm hoping for an easy solution.  I'm trying to upgrade my 9.3 cluster to
> flink 10.1, but i'm running into memory configuration errors.
>
> Such as:
> *Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> The network memory min (64 mb) and max (1 gb) mismatch, the network memory
> has to be resolved and set to a fixed value before task executor starts*
>
> *Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> The required configuration option Key: 'taskmanager.cpu.cores' , default:
> null (fallback keys: []) is not set*
>
> I was able to fix a cascade of errors by explicitly setting these values:
>
> taskmanager.memory.managed.size: {{
> .Values.analytics.flink.taskManagerManagedSize }}
> taskmanager.memory.task.heap.size: {{
> .Values.analytics.flink.taskManagerHeapSize }}
> taskmanager.memory.jvm-metaspace.size: 500m
> taskmanager.cpu.cores: 4
>
> So, the documentation implies that flink will default many of these
> values, however my 101. cluster doesn't seem to be doing this.  9.3, worked
> great!
>
> Do I really have to set all the memory (even network) values?  If not,
> what am I missing?
>
> If i do have to set all the memory parameters, how do I resolve "The
> network memory min (64 mb) and max (1 gb) mismatch"?
>
>
> My cluster runs standalone jobs on kube
>
> flnk-config.yaml:
> state.backend: rocksdb
> state.backend.incremental: true
> state.checkpoints.num-retained: 1
> taskmanager.memory.managed.size: {{
> .Values.analytics.flink.taskManagerManagedSize }}
> taskmanager.memory.task.heap.size: {{
> .Values.analytics.flink.taskManagerHeapSize }}
> taskmanager.memory.jvm-metaspace.size: 500m
> taskmanager.cpu.cores: 4
> taskmanager.numberOfTaskSlots: {{
> .Values.analytics.task.numberOfTaskSlots }}
> parallelism.default: {{ .Values.analytics.flink.parallelism }}
>
>
> JobManger:
> command: ["/opt/flink/bin/standalone-job.sh"]
> args: ["start-foreground", "-j={{ .Values.analytics.flinkRunnable
> }}",  ...
>
> TakManager
> command: ["/opt/flink/bin/taskmanager.sh"]
> args: [
>   "start-foreground",
>   "-Djobmanager.rpc.address=localhost",
>   "-Dmetrics.reporter.prom.port=9430"]
>
>
>
>


Re: Flink/SparkStreaming 性能测试(吞吐/延时)

2020-06-11 Thread Kurt Young
我们最近做了一个基于beam nexmark的性能对比测试[1],你可以参考一下。
和beam的测试不同的是,我们用各自引擎的API对着测试case描述的场景重新写了一下,并不是像这个里面一样全都用
beam的api写测试case,然后翻译到多个runner之上。

[1] https://beam.apache.org/documentation/sdks/java/testing/nexmark/

Best,
Kurt


On Fri, Jun 12, 2020 at 10:49 AM Zhonghan Tang <13122260...@163.com> wrote:

> Hi,
> 近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少,  只有17年美团/15年yahoo
> 做了一个类似的分析. 问题如下:
> 1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能?
> 2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能?
>
>
> 美团链接:
>
> https://tech.meituan.com/2017/11/17/flink-benchmark.html?spm=a2c6h.13066369.0.0.5e3c1455V4UrXH
> yahoo:
>
> https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>
>
> | |
> Zhonghan Tang
> |
> |
> 13122260...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re:kafka connector从指定timestamp开始消费

2020-06-11 Thread Matt Wang
hi,这个功能目前已经在 Flink 中实现了,参考 [1],1.11.0 开始支持


[1]. https://issues.apache.org/jira/browse/FLINK-15220;




---
Best,
Matt Wang


On 06/12/2020 10:37,Kyle Zhang wrote:
Hi,
kafka connector 
ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets

CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'kafka',   

'connector.version' = '0.11', -- required: valid connector versions are
-- "0.8", "0.9", "0.10", "0.11", and "universal"

'connector.topic' = 'topic_name', -- required: topic name from which the table 
is read

'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
'connector.startup-mode' = 'earliest-offset',-- optional: valid modes are 
"earliest-offset",
-- "latest-offset", "group-offsets",
-- or "specific-offsets"

-- optional: used in case of startup mode with specific offsets
'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions
-- into Kafka's partitions valid are "fixed"
-- (each Flink partition ends up in at most one Kafka partition),
-- "round-robin" (a Flink partition is distributed to
-- Kafka partitions round-robin)
-- "custom" (use a custom FlinkKafkaPartitioner subclass)
-- optional: used in case of sink partitioner custom
'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',

'format.type' = '...', -- required: Kafka connector requires to 
specify a format,
...-- the supported formats are 'csv', 
'json' and 'avro'.
-- Please refer to Table Formats section for more details.
)



Flink/SparkStreaming 性能测试(吞吐/延时)

2020-06-11 Thread Zhonghan Tang
Hi,
近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少,  只有17年美团/15年yahoo 
做了一个类似的分析. 问题如下:
1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能?
2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能?


美团链接:
https://tech.meituan.com/2017/11/17/flink-benchmark.html?spm=a2c6h.13066369.0.0.5e3c1455V4UrXH
yahoo:
https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at


| |
Zhonghan Tang
|
|
13122260...@163.com
|
签名由网易邮箱大师定制



回复:kafka connector从指定timestamp开始消费

2020-06-11 Thread claylin
目前版本不支持,我看1.11版本支持,其实可以自己修改支持


 
---原始邮件---
发件人: "Kyle Zhang"

kafka connector从指定timestamp开始消费

2020-06-11 Thread Kyle Zhang
Hi,
kafka connector 
ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'kafka',   

  'connector.version' = '0.11', -- required: valid connector versions are
-- "0.8", "0.9", "0.10", "0.11", and 
"universal"

  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read

  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
  'connector.startup-mode' = 'earliest-offset',-- optional: valid modes are 
"earliest-offset", 
   -- "latest-offset", 
"group-offsets", 
   -- or "specific-offsets"

  -- optional: used in case of startup mode with specific offsets
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions 
 -- into Kafka's partitions valid are 
"fixed" 
 -- (each Flink partition ends up in at 
most one Kafka partition),
 -- "round-robin" (a Flink partition is 
distributed to 
 -- Kafka partitions round-robin)
 -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
  -- optional: used in case of sink partitioner custom
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  
  'format.type' = '...', -- required: Kafka connector requires 
to specify a format,
  ...-- the supported formats are 'csv', 
'json' and 'avro'.
 -- Please refer to Table Formats 
section for more details.
)



Re: 回复:flink on yarn模式的代码运行位置问题

2020-06-11 Thread Yang Wang
> 如何区分代码是运行在Client/JobManager/TaskManager里面?

Yarn perjob模式,用户的main方法运行在Client端,编译生成JobGraph会ship到JobManager进行
任务的调度。我理解用户的代码一般是不会在JobManager端运行的,JM会将task调度到TaskManager
上运行。对于Yarn application模式,用户main运行在JobManager生成JobGraph,其他相同。


> jarB中mainClass是否已经运行在yarn上了?还是运行在服务器端?

还是如上所说,取决于你是deployJobCluster还是deployApplicationCluster,在1.11以前只有deployJobCluster,
也就是mainClass是在client端运行(调用deployJobCluster的地方)。这个是预期内的,因为需要生成
JobGraph之后再上传,也是为了解决这个问题引入了Application Mode[1].


[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode


Best,
Yang


zjfpla...@hotmail.com  于2020年6月11日周四 下午3:55写道:

> 未运行在yarn的容器里面,还在服务器java -cp的进程里面
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: Yichao Yang
> 发送时间: 2020-06-11 15:53
> 收件人: user-zh
> 主题: 回复:flink on yarn模式的代码运行位置问题
> Hi
>
>
> yarn是用作资源管理调度,你所说的未运行在yarn是没有运行在yarn的服务器上吗?
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"zjfpla...@hotmail.com" 发送时间:2020年6月11日(星期四) 下午2:32
> 收件人:"user-zh"
> 主题:flink on yarn模式的代码运行位置问题
>
>
>
> Hi,
>  我在使用flink的过程中,有些疑问请教下各位:
> 
> 1.flink分为jobmanger和taskmanager,我怎么区分哪些代码是运行在jobmanager,哪些在taskmanager?
> 
> 2.假设我jarA中使用AbstractYarnClusterDescriptor.deployJobCluster()替代flink
> run命令(想直接通过jar包启动方式直接提交flink任务上yarn),部署jarB到yarn上,jarB中mainClass中使用StreamExecutionEnvironment.execute去执行流任务,通过java
> -cp jarA的方式来启动,首先能确定的一点是jarA运行在服务器本地端,jarB中mainClass是否已经运行在yarn上了?还是运行在服务器端?
>  服务器端指的是java -cp执行的服务器
>  我这边本身从日志打印和远程debug(java -cp中加remote
> debug参数和flink-conf.yaml中加入remote
> debug配置)来看,jarB中的mainClass还未运行在yarn上,这个是什么原因?
> 
> 
>
>
> zjfpla...@hotmail.com
>


Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-11 Thread LakeShen
Hi ZheFu,

可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
的数据是否都已经 Sink 到了 kafka.

也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。

具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。

Best,
LakeShen

Congxian Qiu  于2020年6月11日周四 上午9:50写道:

> Hi
>
> 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
> java.lang.IllegalStateException: Pending record count must be zero at this
> point: 5”,需要看一下为什么会走到这里
>
> Best,
> Congxian
>
>
> 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
>
> >
> >
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
> >
> > > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> > >
> > > 补充一下,在TaskManager发现了如下错误日志:
> > >
> > > 2020-06-10 12:44:40,688 ERROR
> > > org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> > > during disposal of stream operator.
> > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> > to
> > > send data to Kafka: Pending record count must be zero at this point: 5
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > > at
> > >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > > at
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.IllegalStateException: Pending record count must
> be
> > > zero at this point: 5
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > > at
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > > ... 8 more
> > >
> > > 希望得到帮助,感谢!
> > >
> > >
> > > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> > >
> > >> Hi all,
> > >>
> > >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> > >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> > Field_Filter
> > >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
> > >>
> > >>
> > >>
> >
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> > >>
> > >> 部分报错信息如下:
> > >> 2020-06-10 12:02:49,083 INFO
> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > Triggering
> > >> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> > >> 2020-06-10 12:04:47,898 INFO
> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Decline
> > >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> > >> c41f4811262db1c4c270b136571c8201 at
> > >> container_e27_1591466310139_21670_01_06 @
> > >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> > >> 2020-06-10 12:04:47,899 INFO
> > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > Discarding
> > >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> > >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> > >> complete snapshot 1 for operator Source: Custom Source -> Map ->
> > Source_Map
> > >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map
> ->
> > Map
> > >> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was
> declined.
> > >> at
> > >>
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> > >> at
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> > >> at
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> > >> at
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> > >> at
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> > >> at
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> > >> at
> > >>
> >
> 

Re: Does anyone have an example of Bazel working with Flink?

2020-06-11 Thread Austin Cawley-Edwards
Hey all,

Adding to Aaron's response, we use Bazel to build our Flink apps. We've
open-sourced some of our setup here[1] though a bit outdated. There are
definitely rough edges/ probably needs a good deal of work to fit other
setups. We have written a wrapper around the `java_library` and
`java_binary` and could do the same for `rules_scala`, though we just
started using Bazel last November and have a lot to learn in terms of best
practices there.

If you're interested in contributing to a `rules_flink` project, I would be
as well!

Best,
Austin

[1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020

On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin  wrote:

> Hi Dan,
>
> We use Bazel to compile our Flink applications. We're using "rules_scala" (
> https://github.com/bazelbuild/rules_scala) to manage the dependencies and
> produce jars. We haven't had any issues. However, I have found that
> sometimes it's difficult to figure out exactly what Flink target or
> dependency my application needs.
>
> Unfortunately I'm not sure what issue you're seeing here. I would guess
> either your flink application wasn't compiled into the jar
> you're executing. If you can paste the bazel target used to generate your
> jar and how you're launching the application, that will be helpful
> for diagnosis.
>
> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill  wrote:
>
>> I took the Flink playground and I'm trying to swap out Maven for Bazel.
>> I got to the point where I'm hitting the following error.  I want to diff
>> my code with an existing, working setup.
>>
>> Thanks! - Dan
>>
>>
>> client_1| 
>> org.apache.flink.client.program.ProgramInvocationException:
>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
>> file.
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>
>> client_1| at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>
>


Re: Does anyone have an example of Bazel working with Flink?

2020-06-11 Thread Aaron Levin
Hi Dan,

We use Bazel to compile our Flink applications. We're using "rules_scala" (
https://github.com/bazelbuild/rules_scala) to manage the dependencies and
produce jars. We haven't had any issues. However, I have found that
sometimes it's difficult to figure out exactly what Flink target or
dependency my application needs.

Unfortunately I'm not sure what issue you're seeing here. I would guess
either your flink application wasn't compiled into the jar
you're executing. If you can paste the bazel target used to generate your
jar and how you're launching the application, that will be helpful
for diagnosis.

On Thu, Jun 11, 2020 at 5:21 PM Dan Hill  wrote:

> I took the Flink playground and I'm trying to swap out Maven for Bazel.  I
> got to the point where I'm hitting the following error.  I want to diff my
> code with an existing, working setup.
>
> Thanks! - Dan
>
>
> client_1| 
> org.apache.flink.client.program.ProgramInvocationException:
> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
> file.
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>
> client_1| at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>


Does anyone have an example of Bazel working with Flink?

2020-06-11 Thread Dan Hill
I took the Flink playground and I'm trying to swap out Maven for Bazel.  I
got to the point where I'm hitting the following error.  I want to diff my
code with an existing, working setup.

Thanks! - Dan


client_1|
org.apache.flink.client.program.ProgramInvocationException:
Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
file.

client_1| at
org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)

client_1| at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)

client_1| at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)

client_1| at
org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)

client_1| at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)

client_1| at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

client_1| at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

client_1| at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

client_1| at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)


Re: Shipping Filesystem Plugins with YarnClusterDescriptor

2020-06-11 Thread Kostas Kloudas
Hi John,

I think that using different plugins is not going to be an issue,
assuming that the scheme of your FS's do not collide. This is already
the case for S3 within Flink, where we have 2 implementations, one
based on Presto and one based on Hadoop. For the first you can use the
scheme s3p while for the latter s3a.

Now for different versions of the same plugin, this can be an issue in
the case that all of them are present concurrently in your plugins
directory. But is this the case, or only the latest version of a given
plugin is present?

Keep in mind that after uploading, the "remote" plugins dir is not
shared among applications but it is "private" to each one of them.

Cheers,
Kostas

On Thu, Jun 11, 2020 at 5:12 PM John Mathews  wrote:
>
> So I think that will work, but it has some limitations. Namely, when 
> launching clusters through a service (which is our use case), it can be the 
> case that multiple different clients want clusters with different plugins or 
> different versions of a given plugin, but because the FlinkClusterDescriptor 
> currently reads where to get the plugins to ship from an environment 
> variable, there is a race condition where that directory could contain 
> plugins from multiple different in-flight requests to spin up a cluster.
>
> I think a possible solution is to expose configuration on the 
> YarnClusterDescriptor that is similar to the shipFiles list, but is instead a 
> shipPlugins list, that way, the plugins that get shipping are per yarn 
> cluster request instead of on a global level.
>
> Do you see any workarounds for the issue I described? Also, does the idea I 
> propose make sense as a solution?
>
>
>
> On Wed, Jun 10, 2020 at 9:16 PM Yangze Guo  wrote:
>>
>> Hi, John,
>>
>> AFAIK, Flink will automatically help you to ship the "plugins/"
>> directory of your Flink distribution to Yarn[1]. So, you just need to
>> make a directory in "plugins/" and put your custom jar into it. Do you
>> meet any problem with this approach?
>>
>> [1] 
>> https://github.com/apache/flink/blob/216f65fff10fb0957e324570662d075be66bacdf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L770
>>
>> Best,
>> Yangze Guo
>>
>> On Wed, Jun 10, 2020 at 11:29 PM John Mathews  wrote:
>> >
>> > Hello,
>> >
>> > I have a custom filesystem that I am trying to migrate to the plugins 
>> > model described here: 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#adding-a-new-pluggable-file-system-implementation,
>> >  but it is unclear to me how to dynamically get the plugins directory to 
>> > be available when launching using a Yarn Cluster Descriptor. One thought 
>> > was to add the plugins to the shipFilesList, but I don't think that would 
>> > result in the plugins being in the correct directory location for Flink to 
>> > discover it.
>> >
>> > Is there another way to get the plugins onto the host when launching the 
>> > cluster? Or is there a different recommended way of doing this? Happy to 
>> > answer any questions if something is unclear.
>> >
>> > Thanks so much for your help!
>> >
>> > John


The network memory min (64 mb) and max (1 gb) mismatch

2020-06-11 Thread Clay Teeter
Hi flink fans,

I'm hoping for an easy solution.  I'm trying to upgrade my 9.3 cluster to
flink 10.1, but i'm running into memory configuration errors.

Such as:
*Caused by: org.apache.flink.configuration.IllegalConfigurationException:
The network memory min (64 mb) and max (1 gb) mismatch, the network memory
has to be resolved and set to a fixed value before task executor starts*

*Caused by: org.apache.flink.configuration.IllegalConfigurationException:
The required configuration option Key: 'taskmanager.cpu.cores' , default:
null (fallback keys: []) is not set*

I was able to fix a cascade of errors by explicitly setting these values:

taskmanager.memory.managed.size: {{
.Values.analytics.flink.taskManagerManagedSize }}
taskmanager.memory.task.heap.size: {{
.Values.analytics.flink.taskManagerHeapSize }}
taskmanager.memory.jvm-metaspace.size: 500m
taskmanager.cpu.cores: 4

So, the documentation implies that flink will default many of these values,
however my 101. cluster doesn't seem to be doing this.  9.3, worked great!


Do I really have to set all the memory (even network) values?  If not, what
am I missing?

If i do have to set all the memory parameters, how do I resolve "The
network memory min (64 mb) and max (1 gb) mismatch"?


My cluster runs standalone jobs on kube

flnk-config.yaml:
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.num-retained: 1
taskmanager.memory.managed.size: {{
.Values.analytics.flink.taskManagerManagedSize }}
taskmanager.memory.task.heap.size: {{
.Values.analytics.flink.taskManagerHeapSize }}
taskmanager.memory.jvm-metaspace.size: 500m
taskmanager.cpu.cores: 4
taskmanager.numberOfTaskSlots: {{
.Values.analytics.task.numberOfTaskSlots }}
parallelism.default: {{ .Values.analytics.flink.parallelism }}


JobManger:
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground", "-j={{ .Values.analytics.flinkRunnable
}}",  ...

TakManager
command: ["/opt/flink/bin/taskmanager.sh"]
args: [
  "start-foreground",
  "-Djobmanager.rpc.address=localhost",
  "-Dmetrics.reporter.prom.port=9430"]


Error incompatible types for field cpuCores when doing Flink Upgrade

2020-06-11 Thread Claude Murad
Hello,

I upgraded Flink from 1.7 to 1.10 in Kubernetes.  When the job manager is
launched, the following exception occurs.  If I do some cleanup in
zookeeper and re-start, it will work.  Any ideas about this error and what
needs to be done without having to do cleanup in zookeeper?

ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal
error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
id bfb2a937257727e080ca85933586f38b.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover
job with job id bfb2a937257727e080ca85933586f38b.
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
at
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:184)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 3 more
Caused by: org.apache.flink.util.FlinkException: Could not retrieve
submitted JobGraph from state handle under
/bfb2a937257727e080ca85933586f38b. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:191)
at
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
... 7 more
Caused by: java.io.InvalidClassException:
org.apache.flink.api.common.operators.ResourceSpec; incompatible types for
field cpuCores
at java.io.ObjectStreamClass.matchFields(ObjectStreamClass.java:2467)
at java.io.ObjectStreamClass.getReflector(ObjectStreamClass.java:2361)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:753)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at 

Re: Shipping Filesystem Plugins with YarnClusterDescriptor

2020-06-11 Thread John Mathews
So I think that will work, but it has some limitations. Namely, when
launching clusters through a service (which is our use case), it can be the
case that multiple different clients want clusters with different plugins
or different versions of a given plugin, but because the
FlinkClusterDescriptor currently reads where to get the plugins to ship
from an environment variable, there is a race condition where that
directory could contain plugins from multiple different in-flight requests
to spin up a cluster.

I think a possible solution is to expose configuration on the
YarnClusterDescriptor that is similar to the shipFiles list, but is instead
a shipPlugins list, that way, the plugins that get shipping are per yarn
cluster request instead of on a global level.

Do you see any workarounds for the issue I described? Also, does the idea I
propose make sense as a solution?



On Wed, Jun 10, 2020 at 9:16 PM Yangze Guo  wrote:

> Hi, John,
>
> AFAIK, Flink will automatically help you to ship the "plugins/"
> directory of your Flink distribution to Yarn[1]. So, you just need to
> make a directory in "plugins/" and put your custom jar into it. Do you
> meet any problem with this approach?
>
> [1]
> https://github.com/apache/flink/blob/216f65fff10fb0957e324570662d075be66bacdf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L770
>
> Best,
> Yangze Guo
>
> On Wed, Jun 10, 2020 at 11:29 PM John Mathews 
> wrote:
> >
> > Hello,
> >
> > I have a custom filesystem that I am trying to migrate to the plugins
> model described here:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#adding-a-new-pluggable-file-system-implementation,
> but it is unclear to me how to dynamically get the plugins directory to be
> available when launching using a Yarn Cluster Descriptor. One thought was
> to add the plugins to the shipFilesList, but I don't think that would
> result in the plugins being in the correct directory location for Flink to
> discover it.
> >
> > Is there another way to get the plugins onto the host when launching the
> cluster? Or is there a different recommended way of doing this? Happy to
> answer any questions if something is unclear.
> >
> > Thanks so much for your help!
> >
> > John
>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-06-11 Thread Matt Magsombol
I'm not the original poster, but I'm running into this same issue. What you 
just described is exactly what I want. I presume you guys are using some 
variant of this helm 
https://github.com/docker-flink/examples/tree/master/helm/flink to configure 
your k8s cluster? I'm also assuming that this cluster is running as a job 
cluster and not a session cluster right?
If so, how did you guys set up the deployments.yaml file such that it picks up 
the latest savepoint from a savepoint directory ( and what happens if that 
savepoint directory is empty? This is for cases when we're starting a new 
cluster, new job from scratch and there's no need to recover from previous 
savepoint ).

On 2019/09/24 16:23:52, Hao Sun  wrote: 
> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
> 
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
> 
> Hao Sun
> 
> 
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  wrote:
> 
> > AFAIK there's currently nothing implemented to solve this problem, but
> > working on a possible fix can be implemented on top of
> > https://github.com/lyft/flinkk8soperator
> >  which already
> > has a pretty fancy state machine for rolling upgrades. I'd love to be
> > involved as this is an issue I've been thinking about as well.
> >
> > Yuval
> >
> > On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
> > wrote:
> >
> >> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> >> when deploying Flink jobs to start from savepoints using the job-cluster
> >> mode in Kubernetes.
> >>
> >> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> >> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> >> all long-running streaming jobs, all essentially acting as microservices.
> >> we're using Helm charts to configure all of our deployments.
> >>
> >> we have a number of use cases where we want to restart jobs from a
> >> savepoint to replay recent events, i.e. when we've enhanced the job logic
> >> or fixed a bug. but after the deployment we want to have the job resume
> >> it's "long-running" behavior, where any unplanned restarts resume from the
> >> latest checkpoint.
> >>
> >> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> >> deployment includes the savepoint argument in the configuration. if the Job
> >> Manager container(s) have an unplanned restart, when they come back up they
> >> will start from the savepoint instead of resuming from the latest
> >> checkpoint. everything is working as configured, but that's not exactly
> >> what we want. we want the savepoint argument to be transient somehow (only
> >> used during the initial deployment), but Kubernetes doesn't really support
> >> the concept of transient configuration.
> >>
> >> i can see a couple of potential solutions that either involve custom code
> >> in the jobs or custom logic in the container (i.e. a custom entrypoint
> >> script that records that the configured savepoint has already been used in
> >> a file on a persistent volume or GCS, and potentially when/why/by which
> >> deployment). but these seem like unexpected and hacky solutions. before we
> >> head down that road i wanted to ask:
> >>
> >>- is this is already a solved problem that i've missed?
> >>- is this issue already on the community's radar?
> >>
> >> thanks in advance!
> >>
> >> --
> >> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> >> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
> >> 
> >> 
> >> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> >> It’s not just an IT conference, it’s “a complete learning and networking
> >> experience” 
> >> 
> >>
> >>
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
> 


Re: FLINK SQL DDL写入hbase问题

2020-06-11 Thread Leonard Xu
Hi,

> 您是说将那几个jar都放到flink/lib下吗?

你看这个报错,是在flink的client提交作业的时候就抛出的异常,看起来是你client所在机器配置缺少了hbase的jar。
你在使用hbase集群,肯定线上是有hadoop集群的,hbase 的一些类是依赖了 hadoop 的依赖,所以你把 hbase lib下 jar 加到 
$HADOOP_CLASSPATH下,这样$HADOOP_CLASSPATH 下就有hbase所需的所有jar了,在flink的lib目录下,只需要添加 
flink-hbase_2.11 的依赖即可,flink作业启动脚本会检查$HADOOP_CLASSPATH,能够加载到所需的jar。

> hbase-server
> hbase-common
> hadoop-common


不推荐在 pom 里这样引入hadoop 和 hbase 的 jar 包,这样依赖容易冲突。

Best,
Leonard Xu

> 
> 在2020年06月11日 14:39,Leonard Xu 写道:
> Hi
> 你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase 
> jar,不然依赖问题会比较麻烦。
> 
> 祝好
> Leonard Xu
> 
> 在 2020年6月11日,14:24,酷酷的浑蛋  写道:
> 
> 
> 
> 在使用flink sql ddl语句向hbase中写的时候报如下错误:
> java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
> at 
> org.apache.flink.addons.hbase.HBaseUpsertTableSink.consumeDataStream(HBaseUpsertTableSink.java:87)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:141)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> 
> 
> 项目maven中已经引入过下面依赖
> hbase-server
> hbase-common
> hadoop-common
> flink-hbase_2.11
> 而且我看jar中是有HBaseConfiguration这个类的,为什么放到服务器上执行就报错呢,在本地执行没问题



Reading files from multiple subdirectories

2020-06-11 Thread Lorenzo Nicora
Hi,

related to the same case I am discussing in another thread, but not related
to AVRO this time :)

I need to ingest files a S3 Sink Kafka Connector periodically adds to an S3
bucket.
Files are bucketed by date time as it often happens.

Is there any way, using Flink only, to monitor a base-path and detect new
files in any subdirectories?
Or I need to use something external to move new files in a single directory?

I am currently using
env.readFile(inputFormat, path, PROCESS_CONTINUOUSLY, 6)
with AvroInputFormat, but it seems it can only monitor a single directory


Cheers
Lorenzo


Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvit,

I followed your instructions for the breakpoint in
SpecificDatumReader.readField *with  AVRO 1.8.2*,

For all timestamp-millis fields (I have many):

Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos());


returns null for all timestamp-millis fields (I have many), so...

datum = readWithoutConversion(oldDatum, f.schema(), in);


is used instead and returns a *Long*



Not sure it's relevant, but in this version I have the explicit dependency
org.apache.avro:avro:1.8.2 and I am using the avro-maven-plugin (1.8.2) to
generate the record from .avsc with this configuration:


String
true
private
true



Cheers
Lorenzo


On Thu, 11 Jun 2020 at 13:11, Arvid Heise  wrote:

> Sorry forget my last mail, that was half-finished.
>
> Here is the real one:
>
> Hi Lorenzo,
>
> if you still have time to investigate.
>
> Your stack trace shows that all expected code paths have been taken.
> Conversions are there; although they look different than here, but that can
> be attributed to the avro upgrade.
>
> Could you put a breakpoint on SpecificDatumReader.readField, so that you
> can inspect the conversion for the timestamp field? You probably want to
> make it a conditional for f.name() == .
> The expected flow is that it should have a conversion that returns the
> joda time instead of the long. Then datum should be the converted joda time.
>
> @Override
> protected void readField(Object r, Schema.Field f, Object oldDatum,
>  ResolvingDecoder in, Object state)
> throws IOException {
>   if (r instanceof SpecificRecordBase) {
> Conversion conversion = ((SpecificRecordBase) 
> r).getConversion(f.pos());
>
> Object datum;
> if (conversion != null) {
>   datum = readWithConversion(
>   oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
> } else {
>   datum = readWithoutConversion(oldDatum, f.schema(), in);
> }
>
> getData().setField(r, f.name(), f.pos(), datum);
>
>   } else {
> super.readField(r, f, oldDatum, in, state);
>   }
> }
>
>
> On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise  wrote:
>
>> Hi Lorenzo,
>>
>> if you still have time to investigate.
>>
>> Your stack trace shows that all expected code paths have been taken.
>> Conversions are there although they look different than here, but that can
>> be attributed to the avro upgrade.
>>
>> @Override
>> protected void readField(Object r, Schema.Field f, Object oldDatum,
>>  ResolvingDecoder in, Object state)
>> throws IOException {
>>   if (r instanceof SpecificRecordBase) {
>> Conversion conversion = ((SpecificRecordBase) 
>> r).getConversion(f.pos());
>>
>> Object datum;
>> if (conversion != null) {
>>   datum = readWithConversion(
>>   oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
>> } else {
>>   datum = readWithoutConversion(oldDatum, f.schema(), in);
>> }
>>
>> getData().setField(r, f.name(), f.pos(), datum);
>>
>>   } else {
>> super.readField(r, f, oldDatum, in, state);
>>   }
>> }
>>
>>
>> On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora 
>> wrote:
>>
>>>
>>> Thanks Gouwei,
>>>
>>> setting format.setReuseAvroValue(false) with 1.8.2-generated records
>>> does not solve the problem.
>>>
>>> 12:02:59,314 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>>> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>> org.joda.time.DateTime
>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>> at
>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>
>>>
>>> 
>>>
>>> Summarising, the only working combination seems to be:
>>>
>>>- Use AVRO 1.9.2 code generation, setting 
>>> dateTimeLogicalTypeImplementation
>>>= joda

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-11 Thread Slotterback, Chris
Interestingly, it appears to have been related to the stream application design 
that was causing incremental checkpointing issues. Once the checkpoints started 
failing, they would cause a positive feedback loop of failure as more and more 
data built up to write, and other exceptions would pop up indirectly related to 
the root cause.

Resolution so far:
We noticed that there was always 1 troubled (join) operator that had a single 
subtask that always took orders-of-magnitude longer than the others. That 
operator is an intervaljoin that joins 1:many reports by key every window 
trigger, and noticed that one of our keys had a significantly higher number of 
reports than the others which we hypothesize is causing that one subtask to 
crumble under a larger-than-average skew. Since filtering out that particular 
key, checkpoints have become relatively stable and inline with their size and 
write time.

From: Congxian Qiu 
Date: Friday, June 5, 2020 at 10:42 PM
To: Arvid Heise 
Cc: "Slotterback, Chris" , 
"user@flink.apache.org" 
Subject: Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Hi Chris

From the given exception, seems there is something wrong of the FileSystem, one 
reason is that Arvid gave (incremental checkpoint may generate too much small 
files)
You can turn off incremental checkpoint or try to increase the config 
`state.backend.fs.memory-threshold` to see if things become better

Best,
Congxian


Arvid Heise mailto:ar...@ververica.com>> 于2020年6月6日周六 
上午2:09写道:
Hi Chris,

could you also try what happens when you turn incremental checkpoints off?

Incremental checkpoints may create many small files which are a bad fit for 
HDFS. You could also evaluate other storage options (net drive, S3) if you find 
incremental checkpoints to be better.

On Tue, Jun 2, 2020 at 2:36 AM Slotterback, Chris 
mailto:chris_slotterb...@comcast.com>> wrote:
Congxian,


1. The checkpoints were failing with this exception scattered through the logs:
2020-06-01 21:04:37,930 WARN  org.apache.hadoop.hdfs.DataStreamer - 
DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/flink/flink-checkpoints/ade55daec06ee72aaf7ceade86c6e7a9/chk-1/2093792d-7ebb-4008-8e20-4daf1849c2d4
 could only be replicated to 0 nodes instead of minReplication (=1).

2. Yes, we are using incremental checkpointing
3. Currently our windows are configured to use the process function (we were 
doing aggregates before), which is my understanding that should make our state 
update/insert ratio lower, as we are building the liststates of each window 
over time and processing them on trigger.
4. We set the max concurrent checkpoints back to 1, it was originally 
configured to that and the checkpoints were taking too long to complete before 
the next checkpoint interval began.

Our tm’s are normally 3 slots (3_slots.png), we wanted to try running with 1 
slot (1_slot.png) and noticed the checkpoint times fell drastically, but with 1 
slot per tm our parallelism had to be dropped and our consumer lag was growing.



From: Congxian Qiu mailto:qcx978132...@gmail.com>>
Date: Friday, May 29, 2020 at 10:59 PM
To: "Slotterback, Chris" 
mailto:chris_slotterb...@comcast.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Hi
From the given picture,
1. there were some checkpoint failed(but not because of timeout), could you 
please check why these checkpoint would fail?
2. The checkpoint data size is the delta size for current checkpoint[1], assume 
you using incremental checkpoint
3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can grow to 
~ 15G, my gut feeling is that the state update/insert ratio for your program is 
very high? so that in one checkpoint you'll generate too much sst files
4. from fig 2 seems you configurate 
execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could you 
please try to set it to 1 and have a try?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints
Best,
Congxian


Slotterback, Chris 
mailto:chris_slotterb...@comcast.com>> 
于2020年5月30日周六 上午7:43写道:
Hi there,

We are trying to upgrade a flink app from using FsStateBackend to 
RocksDBStateBackend to reduce overhead memory requirements. When 

streaming restored state after restart

2020-06-11 Thread Adam Atrea
Hi,

I'm new to Flink - but after reading the documentation -

What would be the best approach to stream data from a restored state
following a job restart ?
Say I have a MapState that gets populated during streaming with various
computed results within a stateful operator.

Upon job restart or on task failure recovery I am looking to stream the
data from that restored state downstream so that the operator will replay
it.
The data just to be directed to the downstream - at least once - so
multiple tasks may submit the same data.

Thanks,

Adam


如何做checkpoint的灾备

2020-06-11 Thread dixingxin...@163.com
Hi Flink社区,
目前我们在调研checkpoint 
跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?



Best,
Xingxing Di


Re: Reading from AVRO files

2020-06-11 Thread Arvid Heise
Sorry forget my last mail, that was half-finished.

Here is the real one:

Hi Lorenzo,

if you still have time to investigate.

Your stack trace shows that all expected code paths have been taken.
Conversions are there; although they look different than here, but that can
be attributed to the avro upgrade.

Could you put a breakpoint on SpecificDatumReader.readField, so that you
can inspect the conversion for the timestamp field? You probably want to
make it a conditional for f.name() == .
The expected flow is that it should have a conversion that returns the joda
time instead of the long. Then datum should be the converted joda time.

@Override
protected void readField(Object r, Schema.Field f, Object oldDatum,
 ResolvingDecoder in, Object state)
throws IOException {
  if (r instanceof SpecificRecordBase) {
Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos());

Object datum;
if (conversion != null) {
  datum = readWithConversion(
  oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
} else {
  datum = readWithoutConversion(oldDatum, f.schema(), in);
}

getData().setField(r, f.name(), f.pos(), datum);

  } else {
super.readField(r, f, oldDatum, in, state);
  }
}


On Thu, Jun 11, 2020 at 2:06 PM Arvid Heise  wrote:

> Hi Lorenzo,
>
> if you still have time to investigate.
>
> Your stack trace shows that all expected code paths have been taken.
> Conversions are there although they look different than here, but that can
> be attributed to the avro upgrade.
>
> @Override
> protected void readField(Object r, Schema.Field f, Object oldDatum,
>  ResolvingDecoder in, Object state)
> throws IOException {
>   if (r instanceof SpecificRecordBase) {
> Conversion conversion = ((SpecificRecordBase) 
> r).getConversion(f.pos());
>
> Object datum;
> if (conversion != null) {
>   datum = readWithConversion(
>   oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
> } else {
>   datum = readWithoutConversion(oldDatum, f.schema(), in);
> }
>
> getData().setField(r, f.name(), f.pos(), datum);
>
>   } else {
> super.readField(r, f, oldDatum, in, state);
>   }
> }
>
>
> On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora 
> wrote:
>
>>
>> Thanks Gouwei,
>>
>> setting format.setReuseAvroValue(false) with 1.8.2-generated records
>> does not solve the problem.
>>
>> 12:02:59,314 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>
>>
>> 
>>
>> Summarising, the only working combination seems to be:
>>
>>- Use AVRO 1.9.2 code generation, setting 
>> dateTimeLogicalTypeImplementation
>>= joda
>>- Enabling Object Reuse (being careful for the implications)
>>
>> Using AVRO 1.8.2 code generation does not work, with any of the other
>> workarounds.
>> Using Generic objects does not work for a bug in AvroSerializer
>>  but GenericRecords
>> also brings a number of other problems.
>>
>> I am not very comfortable with using AVRO objects generated with a
>> different AVRO version than the one supported by Flink.
>> I am going to map AVRO records into hand-written POJOs immediately after
>> the ingestion to reduce chances of further issues. I reckon this is very
>> empirical, but that's what the workaround looks to me :)
>>
>> Lorenzo
>>
>> P.S, I want to give a massive thank to this community. So far it has been
>> one of the most reactive and helpful I ever interacted with.
>>
>> On Thu, 11 Jun 2020 at 

Re: Reading from AVRO files

2020-06-11 Thread Arvid Heise
Hi Lorenzo,

if you still have time to investigate.

Your stack trace shows that all expected code paths have been taken.
Conversions are there although they look different than here, but that can
be attributed to the avro upgrade.

@Override
protected void readField(Object r, Schema.Field f, Object oldDatum,
 ResolvingDecoder in, Object state)
throws IOException {
  if (r instanceof SpecificRecordBase) {
Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos());

Object datum;
if (conversion != null) {
  datum = readWithConversion(
  oldDatum, f.schema(), f.schema().getLogicalType(), conversion, in);
} else {
  datum = readWithoutConversion(oldDatum, f.schema(), in);
}

getData().setField(r, f.name(), f.pos(), datum);

  } else {
super.readField(r, f, oldDatum, in, state);
  }
}


On Thu, Jun 11, 2020 at 1:27 PM Lorenzo Nicora 
wrote:

>
> Thanks Gouwei,
>
> setting format.setReuseAvroValue(false) with 1.8.2-generated records does
> not solve the problem.
>
> 12:02:59,314 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
> java.lang.ClassCastException: java.lang.Long cannot be cast to
> org.joda.time.DateTime
> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>
>
> 
>
> Summarising, the only working combination seems to be:
>
>- Use AVRO 1.9.2 code generation, setting dateTimeLogicalTypeImplementation
>= joda
>- Enabling Object Reuse (being careful for the implications)
>
> Using AVRO 1.8.2 code generation does not work, with any of the other
> workarounds.
> Using Generic objects does not work for a bug in AvroSerializer
>  but GenericRecords
> also brings a number of other problems.
>
> I am not very comfortable with using AVRO objects generated with a
> different AVRO version than the one supported by Flink.
> I am going to map AVRO records into hand-written POJOs immediately after
> the ingestion to reduce chances of further issues. I reckon this is very
> empirical, but that's what the workaround looks to me :)
>
> Lorenzo
>
> P.S, I want to give a massive thank to this community. So far it has been
> one of the most reactive and helpful I ever interacted with.
>
> On Thu, 11 Jun 2020 at 10:25, Guowei Ma  wrote:
>
>> Hi,
>> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
>>
>> Best,
>> Guowei
>>
>>
>> Lorenzo Nicora  于2020年6月11日周四 下午5:02写道:
>>
>>> Hi Arvid,
>>>
>>> thanks for the point about catching records. Gotcha!
>>>
>>> Sorry I cannot share the full schema or generated code. It's a 3rd party
>>> IP and we signed a meter-think NDA... I think I can post snippets.
>>> The schema is heavily nested, including arrays of other record types
>>> Types are primitives, or logical decimal and timestamp-millis. No union.
>>>
>>> #conversion is in AccountEntries only (one of the nested records) and
>>> looks like this:
>>>
>>> private static final org.apache.avro.Conversion[] conversions =
>>> new org.apache.avro.Conversion[] {
>>> null,
>>> null,
>>> null,
>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>> null,
>>> null,
>>> null,
>>> null,
>>> null,
>>> null,
>>> null
>>> };
>>>
>>>
>>> Note that I have to generate the specific object with AVRO 1.9.2 Maven
>>> Plugin.
>>> With 1.8.2 generated code it fails with the following exception,
>>> regardless setting enableObjectReuse()
>>>
>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>> org.joda.time.DateTime
>>> at 

Re: Timer metric in Flink

2020-06-11 Thread Chesnay Schepler
There are no immediate plans, mostly because timers are fairly expensive 
and represent an easy trap to trashing performance of invalidating 
benchmark results.


On 11/06/2020 13:11, Vinay Patil wrote:
Ohh Okay, basically implement the Gauge and add timer functionality to 
it for now.


Is there a plan or JIRA ticket to add Timer metric in future release, 
I think it is good to have


Regards,
Vinay Patil


On Wed, Jun 10, 2020 at 5:55 PM Chesnay Schepler > wrote:


You cannot add custom metric types, just implementations of the
existing ones. Your timer(wrapper) will have to implement Gauge or
Histogram.

On 10/06/2020 14:17, Vinay Patil wrote:

Hi,

As timer metric is not provided out of the box, can I create a
new MetricGroup by implementing this interface and add timer
capability, this will be similar to Histogram wrapper Flink has
provided. If yes, I can create a wrapper like

`public TimerWrapper implements Timer` , in this case will also
have to create Timer interface and add it to the metric group.

Is this possible?

I want to have a timer to check Hbase lookup time.

Regards,
Vinay Patil







Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Thanks Gouwei,

setting format.setReuseAvroValue(false) with 1.8.2-generated records does
not solve the problem.

12:02:59,314 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)




Summarising, the only working combination seems to be:

   - Use AVRO 1.9.2 code generation, setting dateTimeLogicalTypeImplementation
   = joda
   - Enabling Object Reuse (being careful for the implications)

Using AVRO 1.8.2 code generation does not work, with any of the other
workarounds.
Using Generic objects does not work for a bug in AvroSerializer
 but GenericRecords also
brings a number of other problems.

I am not very comfortable with using AVRO objects generated with a
different AVRO version than the one supported by Flink.
I am going to map AVRO records into hand-written POJOs immediately after
the ingestion to reduce chances of further issues. I reckon this is very
empirical, but that's what the workaround looks to me :)

Lorenzo

P.S, I want to give a massive thank to this community. So far it has been
one of the most reactive and helpful I ever interacted with.

On Thu, 11 Jun 2020 at 10:25, Guowei Ma  wrote:

> Hi,
> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
>
> Best,
> Guowei
>
>
> Lorenzo Nicora  于2020年6月11日周四 下午5:02写道:
>
>> Hi Arvid,
>>
>> thanks for the point about catching records. Gotcha!
>>
>> Sorry I cannot share the full schema or generated code. It's a 3rd party
>> IP and we signed a meter-think NDA... I think I can post snippets.
>> The schema is heavily nested, including arrays of other record types
>> Types are primitives, or logical decimal and timestamp-millis. No union.
>>
>> #conversion is in AccountEntries only (one of the nested records) and
>> looks like this:
>>
>> private static final org.apache.avro.Conversion[] conversions =
>> new org.apache.avro.Conversion[] {
>> null,
>> null,
>> null,
>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>> null,
>> null,
>> null,
>> null,
>> null,
>> null,
>> null
>> };
>>
>>
>> Note that I have to generate the specific object with AVRO 1.9.2 Maven
>> Plugin.
>> With 1.8.2 generated code it fails with the following exception,
>> regardless setting enableObjectReuse()
>>
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>> at
>> 

Re: Timer metric in Flink

2020-06-11 Thread Vinay Patil
Ohh Okay, basically implement the Gauge and add timer functionality to it
for now.

Is there a plan or JIRA ticket to add Timer metric in future release, I
think it is good to have

Regards,
Vinay Patil


On Wed, Jun 10, 2020 at 5:55 PM Chesnay Schepler  wrote:

> You cannot add custom metric types, just implementations of the existing
> ones. Your timer(wrapper) will have to implement Gauge or Histogram.
>
> On 10/06/2020 14:17, Vinay Patil wrote:
>
> Hi,
>
> As timer metric is not provided out of the box, can I create a new
> MetricGroup by implementing this interface and add timer capability, this
> will be similar to Histogram wrapper Flink has provided. If yes, I can
> create a wrapper like
>
> `public TimerWrapper implements Timer` , in this case will also have to
> create Timer interface and add it to the metric group.
>
> Is this possible?
>
> I want to have a timer to check Hbase lookup time.
>
> Regards,
> Vinay Patil
>
>
>


Re: flink精准一次性消费问题

2020-06-11 Thread tison
>checkpoint的配置有什么要求吗?

配成 EXACTLY_ONCE

>还有就是kafka的事务提交多久能提交一次,可配置吗?

chk 的时候提交,这里面深究的话有点并发问题,可以看 TwoPhaseCommitSink 的细节
配置这个事儿...有能力自定义,但是为啥要这么做呢呢

Best,
tison.


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年6月11日周四 下午4:59写道:

> checkpoint完成通知里提交的事务
>
>
>
>
> --原始邮件--
> 发件人: "胡云川" 发送时间: 2020年6月11日(星期四) 下午4:56
> 收件人: "user-zh" 主题: 回复:flink精准一次性消费问题
>
>
>
> gt;Hi
> gt;这些问题都已经排查过了,
> gt;有一个问题,在做exctly-once的时候,
> gt;checkpoint的配置有什么要求吗?
> gt;还有就是kafka的事务提交多久能提交一次,可配置吗?
> gt;望解答,谢谢各位!
>
>
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"Matt Wang" 发送时间:nbsp;2020年6月10日(星期三) 晚上7:39
> 收件人:nbsp;"user-zh@flink.apache.org"
> 主题:nbsp;Re:flink精准一次性消费问题
>
>
>
> kafka 从 0.11.0 开始支持事务写,在 flink 中如果开启了 EXACTLY-ONCE,数据会先 send 到 kafka,但在未调用
> commit 之前,这部分数据是数据是属于未完成事务的数据,站在 kafka
> 的角度,数据还是会存储下来的,只不过下游在消费的时候,根据nbsp; isolation.level 设置来决定是否能消费到未 commit
> 的数据。
>
>
> ---
> Best,
> Matt Wang
>
>
> On 06/10/2020 14:28,Yichao Yang<1048262...@qq.comgt; wrote:
> Hi
>
>
> sinkamp;nbsp;
> 为kafka时,需要kafka版本大于1.0,并且kafka端也要开启两阶段提交功能才能满足EXACTLY-ONCE。可以检查下你的配置是否都满足。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --amp;nbsp;原始邮件amp;nbsp;--
> 发件人:amp;nbsp;"胡云川" 发送时间:amp;nbsp;2020年6月10日(星期三) 下午2:25
> 收件人:amp;nbsp;"user-zh"
> 主题:amp;nbsp;flink精准一次性消费问题
>
>
>
> amp;amp;gt;Hi,
> amp;amp;gt;在使用flink往kafka写入数据时,使用了EXACTLY-ONCE,但是
>
> amp;amp;gt;在debug测试的时候,发现数据在invoke方法里的traction.producer.send()的时候数据就已经传过去了,没有通过precommit和commit方法
> amp;amp;gt;请问大家可以解释一下吗?谢谢!


[BUG] ColumnFamilyHandle*, _jbyteArray*, int, int)+0xce[thread 140474763392768 also had an error]

2020-06-11 Thread liber xue
   1. #
   2. # A fatal error has been detected by the Java Runtime Environment:
   3. #
   4. # SIGSEGV (0xb) at pc=0x7fc2dd3d045e, pid=409,
   tid=0x7fc2d46f4700
   5. #
   6. # JRE version: OpenJDK Runtime Environment (8.0_222-b10) (build
   1.8.0_222-b10)
   7. # Java VM: OpenJDK 64-Bit Server VM (25.222-b10 mixed mode
   linux-amd64 compressed oops)
   8. # Problematic frame:
   9. # C [librocksdbjni-linux64.so+0x2fe45e]
   rocksdb_delete_helper(JNIEnv_*, rocksdb::DB*, rocksdb::WriteOptions const&,
   rocksdb::ColumnFamilyHandle*, _jbyteArray*, int, int)+0xce[thread
   140474763392768 also had an error]
   10.
   11. #
   12. # Core dump written. Default location: /opt/flink/core or core.409
   13. #
   14. # An error report file with more information is saved as:
   15. # /opt/flink/hs_err_pid409.log
   16. [thread 140474762340096 also had an error]
   17. [thread 140474761287424 also had an error]
   18. [thread 140474760234752 also had an error]
   19. [thread 140474759182080 also had an error]
   20. [thread 140474758129408 also had an error]
   21. [thread 140474748684032 also had an error]
   22. [thread 140474747631360 also had an error]
   23. [thread 140474746578688 also had an error]
   24. [thread 140474745526016 also had an error]
   25. [thread 140474744473344 also had an error]
   26. [thread 140474743420672 also had an error]
   27. [thread 140474742368000 also had an error]
   28. [thread 140474741315328 also had an error]
   29. [thread 140474740262656 also had an error]
   30. [thread 140474739209984 also had an error]
   31. [thread 140474738157312 also had an error]
   32. [thread 140474737104640 also had an error]
   33. [thread 140474736051968 also had an error]
   34. [thread 140474734999296 also had an error]
   35. [thread 140474733946624 also had an error]
   36. [thread 140474732893952 also had an error]
   37. [thread 140474731841280 also had an error]
   38. #
   39. # If you would like to submit a bug report, please visit:
   40. # http://bugreport.java.com/bugreport/crash.jsp
   41. # The crash happened outside the Java Virtual Machine in native code.
   42. # See problematic frame for where to report the bug.
   43. #
   44. command terminated with exit code 137


Restore from savepoint through Java API

2020-06-11 Thread Abhishek Rai
Hello,

I'm writing a test for my custom sink function.  The function is stateful
and relies on checkpoint restores for maintaining consistency with the
external system that it's writing to.  For integration testing of the sink
function, I have a MiniCluster based environment inside a single JVM
through which I create my job and validate its operation.

In order to test the checkpoint restore behavior with precision, I've
disabled checkpointing and am instead using savepoints.  So, my test
proceeds as follows:

1. Start a job.
2. Push some data through it to the sink and to an external system.
3. Trigger a savepoint.
4. Push more data.
5. Cancel the job.
6. Restore from the savepoint captured in step 3 above.

I can't seem to find a Java API for restoring a job from a savepoint.  The
approach in the documentation and other resources is to use the CLI, which
is not an option for me.  Currently, I create a RemoteStreamEnvironment
with savepointRestoreSettings set, but when I run execute(), I get the
following error:

java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot execute.

var savepointDir =
restClusterClient_.triggerSavepoint(jobId, tmpdir).get();
assertTrue(!savepointDir.isBlank());
// Cancel the job and launch a new one from the save point.
restClusterClient_.cancel(jobId).get();
var restoreSettings = SavepointRestoreSettings.forPath(savepointDir);
var env = new RemoteStreamEnvironment(
flinkMiniCluster_.host(),
flinkMiniCluster_.port(),
null,
new String[] {},
null,
restoreSettings);
var restoredJob = env.executeAsync();


Separately, is there a flink testing utility I could use for integration
testing of state checkpointing and recovery?

Thanks,
Abhishek


Re: Reading from AVRO files

2020-06-11 Thread Guowei Ma
Hi,
for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);

Best,
Guowei


Lorenzo Nicora  于2020年6月11日周四 下午5:02写道:

> Hi Arvid,
>
> thanks for the point about catching records. Gotcha!
>
> Sorry I cannot share the full schema or generated code. It's a 3rd party
> IP and we signed a meter-think NDA... I think I can post snippets.
> The schema is heavily nested, including arrays of other record types
> Types are primitives, or logical decimal and timestamp-millis. No union.
>
> #conversion is in AccountEntries only (one of the nested records) and
> looks like this:
>
> private static final org.apache.avro.Conversion[] conversions =
> new org.apache.avro.Conversion[] {
> null,
> null,
> null,
> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
> new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
> null,
> null,
> null,
> null,
> null,
> null,
> null
> };
>
>
> Note that I have to generate the specific object with AVRO 1.9.2 Maven
> Plugin.
> With 1.8.2 generated code it fails with the following exception,
> regardless setting enableObjectReuse()
>
> java.lang.ClassCastException: java.lang.Long cannot be cast to
> org.joda.time.DateTime
> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>
>
> Thanks for the help
> Lorenzo
>
>
> On Thu, 11 Jun 2020 at 08:58, Arvid Heise  wrote:
>
>> Hi Lorenzo,
>>
>> I'm glad that it worked out somehow, but I'd still like to understand
>> what went wrong, so it will work more smoothly for future users. I double
>> checked and we even test AvroSerializer with logical types, so I'm a bit
>> puzzled.
>>
>> Could you attach GlHeader or at least show us how GlHeader#conversions look
>> like? I want to exclude the possibility that the source generator screwed
>> up.
>>
>> Concerning object reuse is that you need to treat all POJO as immutable
>> (I'm assuming that that's what your meant from your description), but you
>> should also never cache values like
>> class ShiftElements extends MapFunction {
>>   Object lastElement;
>>
>>   Object map(Object newElement, Collector out) {
>> out.collect(lastElement);
>> lastElement = newElement; // <- never cache with enableObjectReuse
>>   }
>> }
>>
>> (excuse my ugly code)
>>
>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora 
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> answering to your other questions
>>>
>>> Here is the stacktrace of the case (1),  when I try to read using
>>> specific records generated by the AVRO 1.8.2 plugin
>>>
>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>> org.joda.time.DateTime
>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>> at
>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>> at
>>> 

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvid,

thanks for the point about catching records. Gotcha!

Sorry I cannot share the full schema or generated code. It's a 3rd party IP
and we signed a meter-think NDA... I think I can post snippets.
The schema is heavily nested, including arrays of other record types
Types are primitives, or logical decimal and timestamp-millis. No union.

#conversion is in AccountEntries only (one of the nested records) and looks
like this:

private static final org.apache.avro.Conversion[] conversions =
new org.apache.avro.Conversion[] {
null,
null,
null,
new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
null,
null,
null,
null,
null,
null,
null
};


Note that I have to generate the specific object with AVRO 1.9.2 Maven
Plugin.
With 1.8.2 generated code it fails with the following exception, regardless
setting enableObjectReuse()

java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)


Thanks for the help
Lorenzo


On Thu, 11 Jun 2020 at 08:58, Arvid Heise  wrote:

> Hi Lorenzo,
>
> I'm glad that it worked out somehow, but I'd still like to understand what
> went wrong, so it will work more smoothly for future users. I double
> checked and we even test AvroSerializer with logical types, so I'm a bit
> puzzled.
>
> Could you attach GlHeader or at least show us how GlHeader#conversions look
> like? I want to exclude the possibility that the source generator screwed
> up.
>
> Concerning object reuse is that you need to treat all POJO as immutable
> (I'm assuming that that's what your meant from your description), but you
> should also never cache values like
> class ShiftElements extends MapFunction {
>   Object lastElement;
>
>   Object map(Object newElement, Collector out) {
> out.collect(lastElement);
> lastElement = newElement; // <- never cache with enableObjectReuse
>   }
> }
>
> (excuse my ugly code)
>
> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora 
> wrote:
>
>> Hi Arvid,
>>
>> answering to your other questions
>>
>> Here is the stacktrace of the case (1),  when I try to read using
>> specific records generated by the AVRO 1.8.2 plugin
>>
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>
>>
>> I also tried generating the specific object with avro 1.9.2 (2)  but
>> forcing it to use Joda time but still didn't work
>>
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to 

??????flink??????????????????

2020-06-11 Thread ??????(Jiacheng Jiang)
checkpoint




----
??: "??"

??????flink??????????????????

2020-06-11 Thread ??????
Hi

exctly-once
checkpoint
kafka
??






----
??:"Matt Wang"

??????flink????????????????

2020-06-11 Thread Yichao Yang
Hi


broadcast??


Best,
Yichao Yang




----
??:"xue...@outlook.com"https://go.microsoft.com/fwlink/?LinkId=550986;

?????? ????flinksql between????

2020-06-11 Thread ??????
 joinmysql?? group by 
?? A(id,ip) 
mysql??B(startip,endip,area_id)
??A.ip between(B.startip,B.endIp) ??area_id 
??area_id
sql betweenID ??
??
  val table = tnv.sqlQuery("select a.*,b.area_id as 
s_area_id,b.unit_id as s_unit_id,(ip_to_num(b.end_ip)-ip_to_num(b.start_ip)) as 
scoped from OMstream as a left join sqlStream as b on 
ip_to_num(a.s_ip)  ip_to_num(b.start_ip) and ip_to_num(a.s_ip) 
http://logging.apache.org/log4j/1.2/faq.html#noconfig 


Re: Reading from AVRO files

2020-06-11 Thread Guowei Ma
Hi,
I write a test for case 1 but it does not throw any exception. I use
the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test.
Could you check whether AccountEntries.class  has following code:

private static final org.apache.avro.Conversion[] conversions =
new org.apache.avro.Conversion[] {
TIMESTAMP_CONVERSION,
DATE_CONVERSION,
TIME_CONVERSION,
null
};

Best,
Guowei

Best,
Guowei


Guowei Ma  于2020年6月11日周四 下午4:12写道:

> Hi,
> I write a test for the case 1 but it does not throw any exception. I use
> the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test.
> Best,
> Guowei
>
>
> Arvid Heise  于2020年6月11日周四 下午3:58写道:
>
>> Hi Lorenzo,
>>
>> I'm glad that it worked out somehow, but I'd still like to understand
>> what went wrong, so it will work more smoothly for future users. I double
>> checked and we even test AvroSerializer with logical types, so I'm a bit
>> puzzled.
>>
>> Could you attach GlHeader or at least show us how GlHeader#conversions look
>> like? I want to exclude the possibility that the source generator screwed
>> up.
>>
>> Concerning object reuse is that you need to treat all POJO as immutable
>> (I'm assuming that that's what your meant from your description), but you
>> should also never cache values like
>> class ShiftElements extends MapFunction {
>>   Object lastElement;
>>
>>   Object map(Object newElement, Collector out) {
>> out.collect(lastElement);
>> lastElement = newElement; // <- never cache with enableObjectReuse
>>   }
>> }
>>
>> (excuse my ugly code)
>>
>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora 
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> answering to your other questions
>>>
>>> Here is the stacktrace of the case (1),  when I try to read using
>>> specific records generated by the AVRO 1.8.2 plugin
>>>
>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>> org.joda.time.DateTime
>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>> at
>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>
>>>
>>> I also tried generating the specific object with avro 1.9.2 (2)  but
>>> forcing it to use Joda time but still didn't work
>>>
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
>>> at
>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>> at
>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>> at 

Re: Reading from AVRO files

2020-06-11 Thread Guowei Ma
Hi,
I write a test for the case 1 but it does not throw any exception. I use
the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test.
Best,
Guowei


Arvid Heise  于2020年6月11日周四 下午3:58写道:

> Hi Lorenzo,
>
> I'm glad that it worked out somehow, but I'd still like to understand what
> went wrong, so it will work more smoothly for future users. I double
> checked and we even test AvroSerializer with logical types, so I'm a bit
> puzzled.
>
> Could you attach GlHeader or at least show us how GlHeader#conversions look
> like? I want to exclude the possibility that the source generator screwed
> up.
>
> Concerning object reuse is that you need to treat all POJO as immutable
> (I'm assuming that that's what your meant from your description), but you
> should also never cache values like
> class ShiftElements extends MapFunction {
>   Object lastElement;
>
>   Object map(Object newElement, Collector out) {
> out.collect(lastElement);
> lastElement = newElement; // <- never cache with enableObjectReuse
>   }
> }
>
> (excuse my ugly code)
>
> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora 
> wrote:
>
>> Hi Arvid,
>>
>> answering to your other questions
>>
>> Here is the stacktrace of the case (1),  when I try to read using
>> specific records generated by the AVRO 1.8.2 plugin
>>
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>
>>
>> I also tried generating the specific object with avro 1.9.2 (2)  but
>> forcing it to use Joda time but still didn't work
>>
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
>> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>> at
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>> at
>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>> ... 7 more
>>
>>
>> But in the second case, it seems the 

Re: Reading from AVRO files

2020-06-11 Thread Arvid Heise
Hi Lorenzo,

I'm glad that it worked out somehow, but I'd still like to understand what
went wrong, so it will work more smoothly for future users. I double
checked and we even test AvroSerializer with logical types, so I'm a bit
puzzled.

Could you attach GlHeader or at least show us how GlHeader#conversions look
like? I want to exclude the possibility that the source generator screwed
up.

Concerning object reuse is that you need to treat all POJO as immutable
(I'm assuming that that's what your meant from your description), but you
should also never cache values like
class ShiftElements extends MapFunction {
  Object lastElement;

  Object map(Object newElement, Collector out) {
out.collect(lastElement);
lastElement = newElement; // <- never cache with enableObjectReuse
  }
}

(excuse my ugly code)

On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora 
wrote:

> Hi Arvid,
>
> answering to your other questions
>
> Here is the stacktrace of the case (1),  when I try to read using specific
> records generated by the AVRO 1.8.2 plugin
>
> java.lang.ClassCastException: java.lang.Long cannot be cast to
> org.joda.time.DateTime
> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
> at
> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>
>
> I also tried generating the specific object with avro 1.9.2 (2)  but
> forcing it to use Joda time but still didn't work
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> ... 7 more
>
>
> But in the second case, it seems the failure happens when Flink tries to
> make a copy of the record.
> So I followed your suggestion of enableObjectReuse() and* IT WORKS!*
>
> I am not sure I understand all implications of object reuse in Flink,
> specifically.
> I am familiar with the general risk of mutable messages, and I always
> handle them as mutable even when 

flink预加载数据的方式

2020-06-11 Thread xue...@outlook.com
环境背景:flink 1.10 standalone cluster


原因:因数据处理,需要动态的增加OutputTag做数据流的分流

即因业务原因,需要有些A数据先运行计算好后,才能被使用B、C、D类数据流结合处理后的结果后的结果流,才能使用A数据流。但A是变化的。

目的:除了Rich方式在open中能够预加载数据外(即先于数据流处理其他数据),是否还有其他方式

在flink的流式计算中没有什么比较好的办法,实施流之间的数据结果依赖。
我试过使用流式数据处理A;使用流式数据处理{B、C、D}在Function_XXX中使用A的结果流
但是Function_XXX中使用A的结果时,A的结果是空的。
尝试过A的数据处理改成DataSet方式batch处理后collection()得到List后,在Function_XXX中使用List,但List也是空集。

因基于双流join在具体的业务上时间跨度太大等原因不适合。

发送自 Windows 10 版邮件应用



????: ??????flink on yarn??????????????????????

2020-06-11 Thread zjfpla...@hotmail.com
yarn??java -cp??



zjfpla...@hotmail.com
 
 Yichao Yang
?? 2020-06-11 15:53
 user-zh
?? ??flink on yarn??
Hi
 
 
yarnyarnyarn??
 
 
Best,
Yichao Yang
 
 
 
 
----
??:"zjfpla...@hotmail.com"

??????flink on yarn??????????????????????

2020-06-11 Thread Yichao Yang
Hi


yarnyarnyarn??


Best,
Yichao Yang




----
??:"zjfpla...@hotmail.com"

Re: How to use Hbase Connector Sink

2020-06-11 Thread Caizhi Weng
Hi,

The stack trace indicates that your query schema does not match with your
sink schema. It seems that `active_ratio*25 score` in your query is a
double value, not a `ROW` you declared in your sink.

op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:

> hi
> flink1.10,wen i want to sink data to hbase table like this:
>
>  bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
>rowkey String,
>info ROW
>  ) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',
>'connector.table-name' = 'ms:test_circle_info',
>'connector.zookeeper.quorum' = 'localhost:2181',
>'connector.zookeeper.znode.parent' =
> '/hbase-secure',
>'connector.write.buffer-flush.max-size' =
> '10mb',
>'connector.write.buffer-flush.max-rows' =
> '1000',
>'connector.write.buffer-flush.interval' = '2s'
>  )""")
>
> bstEnv.sqlUpdate(
>   """
> |insert into circle_weight
> |select
> |concat_ws('_',circleName,dt) rowkey,
> |active_ratio*25 score
> |from tb""")
>
> but i get following exceptions,can anybody tell me what is wrong?
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink
> default_catalog.default_database.circle_weight do not match.
> Query schema: [rowkey: STRING, score: DOUBLE]
> Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
>


Re: How to use Hbase Connector Sink

2020-06-11 Thread godfrey he
hi,

you should make sure the types of the selected fields and the types of sink
table are the same,
otherwise you will get the above exception. you can change `active_ratio*25
score` to row type, just like:

insert into circle_weight select rowkey, ROW(info) from (
select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info
from tb) t;


Best,
Godfrey

op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:

> hi
> flink1.10,wen i want to sink data to hbase table like this:
>
>  bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
>rowkey String,
>info ROW
>  ) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',
>'connector.table-name' = 'ms:test_circle_info',
>'connector.zookeeper.quorum' = 'localhost:2181',
>'connector.zookeeper.znode.parent' =
> '/hbase-secure',
>'connector.write.buffer-flush.max-size' =
> '10mb',
>'connector.write.buffer-flush.max-rows' =
> '1000',
>'connector.write.buffer-flush.interval' = '2s'
>  )""")
>
> bstEnv.sqlUpdate(
>   """
> |insert into circle_weight
> |select
> |concat_ws('_',circleName,dt) rowkey,
> |active_ratio*25 score
> |from tb""")
>
> but i get following exceptions,can anybody tell me what is wrong?
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink
> default_catalog.default_database.circle_weight do not match.
> Query schema: [rowkey: STRING, score: DOUBLE]
> Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
>


How to use Hbase Connector Sink

2020-06-11 Thread op
hi
flink1.10??wen i want to sink data to hbase table like this??


bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
   
  rowkey String,
   
  info ROW

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvid,

answering to your other questions

Here is the stacktrace of the case (1),  when I try to read using specific
records generated by the AVRO 1.8.2 plugin

java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)


I also tried generating the specific object with avro 1.9.2 (2)  but
forcing it to use Joda time but still didn't work

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
at
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
... 7 more


But in the second case, it seems the failure happens when Flink tries to
make a copy of the record.
So I followed your suggestion of enableObjectReuse() and* IT WORKS!*

I am not sure I understand all implications of object reuse in Flink,
specifically.
I am familiar with the general risk of mutable messages, and I always
handle them as mutable even when they are POJO. Never mutating and
forwarding the same record.
Not sure whether there are other implications in Flink.

Many thanks
Lorenzo


On Wed, 10 Jun 2020 at 17:52, Arvid Heise  wrote:

> Hi Lorenzo,
>
> 1) I'm surprised that this doesn't work. I'd like to see that stacktrace.
>
> 2) cannot work like this, because we bundle Avro 1.8.2. You could retest
> with dateTimeLogicalType='Joda' set, but then you will probably see the
> same issue as 1)
>
> 3) I'm surprised that this doesn't work either. There is a codepath since
> 2016 for GenericRecord and it's covered in a test. From the error
> description and the ticket, it looks like the issue is not the
> AvroInputFormat, but the serializer. So it would probably work with a
> different serializer (but that would cause back and forth type
> transformation).
>
> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora 
> wrote:
>
>> Thanks Timo,
>>
>> the stacktrace with 1.9.2-generated specific file is the following
>>
>> 

回复: FLINK SQL DDL写入hbase问题

2020-06-11 Thread 酷酷的浑蛋




您是说将那几个jar都放到flink/lib下吗?


在2020年06月11日 14:39,Leonard Xu 写道:
Hi
你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase 
jar,不然依赖问题会比较麻烦。

祝好
Leonard Xu

在 2020年6月11日,14:24,酷酷的浑蛋  写道:



在使用flink sql ddl语句向hbase中写的时候报如下错误:
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
at 
org.apache.flink.addons.hbase.HBaseUpsertTableSink.consumeDataStream(HBaseUpsertTableSink.java:87)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:141)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)


项目maven中已经引入过下面依赖
hbase-server
hbase-common
hadoop-common
flink-hbase_2.11
而且我看jar中是有HBaseConfiguration这个类的,为什么放到服务器上执行就报错呢,在本地执行没问题


Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-11 Thread Arti Pande
Hi Arvid,

Thanks for a quick reply.

The second reference link (
http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED=2) from
your answer is not accessible though. Could you share some more numbers
from it? Are these benchmarks published somewhere?

Without actual IO call, Async IO operator benchmark of 1.6 K records/ms per
core translates to *1.6 million records/sec per core*. So an 8 core machine
should give roughly *12.8 million records/sec* ? Is this the correct
number? How do we compare it with this benchmark

article that talks about total throughput of 4 million records/sec (without
Async IO operator) in a cluster of about 10 machines with 16-core each?

Ordered wait is indispensable for our use-case because we need to call the
external (partner organisation) system's API endpoint for each incoming
record. Depending on the response from that API we need to decide how to
process this record and order needs to be preserved. This may not have been
a problem if data ingestion rates were low. Real challenge is because of
the high-speed stream (millions of events per second) of input.

Is higher core machines an answer or is Flink not suitable for use-cases
like this?



On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise  wrote:

> Hi Arti,
>
> microbenchmarks for AsyncIO are available [1] and the results shown in
> [2]. So you can roughly expect 1.6k records/ms per core to be the upper
> limit without any actual I/O. That range should hold for Flink 1.10 and
> coming Flink 1.11. I cannot say much about older versions and you didn't
> specify which you use. But it shouldn't be an order of magnitude different.
>
> The biggest performance improvement will probably be to switch to
> unordered - results are emitted as soon as they arrive. On ordered, the
> first element that came in, needs to be finished before any emission. If
> some elements take longer than others, these slow elements quickly become a
> bottleneck.
>
> If async I/O needs to be ordered, then you need to tweak what you already
> mentioned. Set DOP to the number of physical cores, there is no benefit in
> going higher. If you indeed use an async HTTP client, then the queue size
> should be a bit higher than the thread pool size. The thread pool size will
> effectively limit the parallelism per subtask and you want to saturate that
> from the Flink side. The thread pool size (together with maxConnections)
> will put the hard limit together with the request processing time on your
> application.
>
> I'd probably consider using more machines in your stead instead of more
> cores per machine (sounded like that is an option). So instead of using
> 10x12 cores, use 15x8 cores. You could measure how much max throughput to
> expect by using one machine and use a benchmarking tool that increases the
> requests per second on that machine until it hits the limit. Then you know
> how many machines you need at the very least.
>
> Finally, it might also be a good time to review your architecture.
> Microservices are not the best fit for a streaming application. For
> example, if this is a lookup service, it would scale and fit much better if
> all data could be ingested by Flink as an additional data source (e.g.
> Kafka topic). Existing microservices might be converted into such data
> sources with change-data-capture.
>
> [1]
> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
> [2] http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED=2
>
> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande  wrote:
>
>> As Flink Async IO operator is designed for external API or DB calls, are
>> there any specific guidelines / tips for scaling up this operator?
>> Particularly for use-cases where incoming events are being ingested at a
>> very high-speed and the Async IO operator with orderedWait mode can not
>> keep up with that speed (although the target API endpoint it is calling is
>> load tested to provide much higher throughput with very minimal latency).
>> In our case adding Async IO operator to the pipeline *reduced the
>> throughput by 88% to 90%*. This is huge performance hit!
>>
>> We tried a couple of things:
>>
>>1. Increasing the async buffer capacity parameter, there by
>>increasing the number of concurrent requests at any given point in time
>>that are waiting for response. This proved counter-productive beyond a 
>> very
>>small number like 50 or 100.
>>2. Increasing the operator parallelism. This does not help much as
>>the number of cores on our machines are limited (8 or 12)
>>3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>maxConnections, maxConnectionsPerHost) and the size of
>>FixedThreadPool used by the Listener of its Future. Again without much
>>improvement.
>>
>> Our observation is that 

?????? BLinkPlanner sql join????????

2020-06-11 Thread op
??Blinkplanner??oldplanner??1.10




package test.table.sql


import java.util.Properties


import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.windowing.time.{Time = WindowTime}
import org.apache.flink.types.Row




object test {


 def main(args: Array[String]): Unit = {


  
//
  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
  bsEnv.setNumberOfExecutionRetries(1)
  bsEnv.setParallelism(1)
  //bsEnv.getConfig.setAutoWatermarkInterval(1)
  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  bsEnv.setStateBackend(new 
FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ", 
true))
  bsEnv.getCheckpointConfig.setCheckpointInterval(30)
  bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(6)
  bsEnv.setParallelism(3)
  bsEnv.setNumberOfExecutionRetries(1)


  
//TABLE


  val setting = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val bstEnv = StreamTableEnvironment.create(bsEnv,setting)
  val tConfig = bstEnv.getConfig
  
tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20))
  val config = bstEnv.getConfig.getConfiguration()
  config.setString("table.exec.mini-batch.enabled", "true") // 
local-global aggregation depends on mini-batch is enabled
  config.setString("table.exec.mini-batch.allow-latency", "5 s")
  config.setString("table.exec.mini-batch.size", "5000")
  config.setString("table.optimizer.agg-phase-strategy", 
"TWO_PHASE") // enable two-phase, i.e. local-global aggregation
  config.setString("table.optimizer.distinct-agg.split.enabled", 
"true")
  //bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8"))



  
//??
  val kafkaProps = new Properties()
  kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers)
  val source = 
   
.toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId)


  bstEnv.createTemporaryView("source",source)


  val q1=bstEnv.sqlQuery(
   """select sessionId from source
|where sessionId is not null
|and action='P_TIMELINE'""".stripMargin)
   q1.toAppendStream[Row].print("source")
  bstEnv.createTemporaryView("sourcefeed",q1)

  val q2=bstEnv.sqlQuery(
   """select sessionId from source
|where sessionId is not null
|and action='V_TIMELINE_FEED'""".stripMargin)
  bstEnv.createTemporaryView("postfeed",q2)

  bstEnv.sqlQuery(
   """
|select count(b.sessionId) from
|sourcefeed a
|join postfeed b
|on a.sessionId=b.sessionId
   """.stripMargin).toRetractStream[Row].print("")




  bstEnv.execute("")
 }
}









----
??:"Leonard Xu"

Re: BLinkPlanner sql join状态清理

2020-06-11 Thread Leonard Xu
Hi,

可以稳定复现吗?麻烦贴下flink版本和你的case, 我可以帮忙跟进确认下

Best,
Leonard Xu
> 在 2020年6月11日,14:30,op <520075...@qq.com> 写道:
> 
> 大家好,最近发现一个问题
> 发现同一套代码,用oldPlanner设置IdleStateRetentionTime后join两边表的状态可以被定时清理,但是用blinkplanner同样的代码,运行却发现状态不会被清理,这是个bug吗?



Re: FLINK SQL DDL写入hbase问题

2020-06-11 Thread Leonard Xu
Hi
你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase 
jar,不然依赖问题会比较麻烦。

祝好
Leonard Xu

> 在 2020年6月11日,14:24,酷酷的浑蛋  写道:
> 
> 
> 
> 在使用flink sql ddl语句向hbase中写的时候报如下错误:
> java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
>at 
> org.apache.flink.addons.hbase.HBaseUpsertTableSink.consumeDataStream(HBaseUpsertTableSink.java:87)
>at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:141)
>at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> 
> 
> 项目maven中已经引入过下面依赖
> hbase-server
> hbase-common
> hadoop-common
> flink-hbase_2.11
> 而且我看jar中是有HBaseConfiguration这个类的,为什么放到服务器上执行就报错呢,在本地执行没问题



BLinkPlanner sql join????????

2020-06-11 Thread op

??oldPlannerIdleStateRetentionTime??join??blinkplannerbug

flink on yarn模式的代码运行位置问题

2020-06-11 Thread zjfpla...@hotmail.com
Hi,
我在使用flink的过程中,有些疑问请教下各位:
1.flink分为jobmanger和taskmanager,我怎么区分哪些代码是运行在jobmanager,哪些在taskmanager?
2.假设我jarA中使用AbstractYarnClusterDescriptor.deployJobCluster()替代flink 
run命令(想直接通过jar包启动方式直接提交flink任务上yarn),部署jarB到yarn上,jarB中mainClass中使用StreamExecutionEnvironment.execute去执行流任务,通过java
 -cp jarA的方式来启动,首先能确定的一点是jarA运行在服务器本地端,jarB中mainClass是否已经运行在yarn上了?还是运行在服务器端?
 服务器端指的是java -cp执行的服务器
 我这边本身从日志打印和远程debug(java -cp中加remote debug参数和flink-conf.yaml中加入remote 
debug配置)来看,jarB中的mainClass还未运行在yarn上,这个是什么原因? 
   



zjfpla...@hotmail.com


Re: Understading Flink statefun deployment

2020-06-11 Thread slinkydeveloper
Hi Igal, thanks for your help.
If I understood correctly, the flink deployments (not the functions) needs
to use the same image right? Which means that the flink master and all
workers still needs to use the same image which includes the module.yaml and
the jar with embedded modules of the full project, right?
I was looking for something different: scale the workers independently,
together with the functions. I'm trying to experiment something here: 
https://github.com/slinkydeveloper/playing-with-statefun
  

In this project I'm trying to deploy separately:

* ingress
* egress
* "mapper" function
* "greeter" function

They're all "embedded functions" and I wish to deploy each piece separately
in a separate deployment.
In the next iteration of the experiment I wish to create "remote functions"
deploying them in the same pod of the workers, so the worker talks to the
image using localhost.

Hence my question: Is it possible to deploy one worker per function/group of
functions and compose my application of multiple heterogeneous worker
images? If not, does it make sense to do it or it's just no sense for
statefun architecture?

FG



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: flink sql bigint cannot be cast to mysql Long

2020-06-11 Thread Zhou Zach
3ku

















在 2020-06-11 14:10:53,"Leonard Xu"  写道:
>Hi, 
>
>JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。
>bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了  bigint (range is 
>-9223372036854775808 to 9223372036854775807)的长度。
>
>
>最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。 
>
>祝好,
>Leonard Xu
>
>[1]  https://issues.apache.org/jira/browse/FLINK-17657 
>
>
>> 在 2020年6月11日,13:51,Zhou Zach  写道:
>> 
>> bigint(20) unsigned
>


FLINK SQL DDL写入hbase问题

2020-06-11 Thread 酷酷的浑蛋


在使用flink sql ddl语句向hbase中写的时候报如下错误:
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
at 
org.apache.flink.addons.hbase.HBaseUpsertTableSink.consumeDataStream(HBaseUpsertTableSink.java:87)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:141)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)


项目maven中已经引入过下面依赖
hbase-server
hbase-common
hadoop-common
flink-hbase_2.11
而且我看jar中是有HBaseConfiguration这个类的,为什么放到服务器上执行就报错呢,在本地执行没问题

Re: flink sql bigint cannot be cast to mysql Long

2020-06-11 Thread Leonard Xu
Hi, 

JDBC connector 之前不支持 unsigned 类型,unsigned 会比signed 类型更长。
bigint(20) unsigned(range is 0 to 18446744073709551615) 超过了  bigint (range is 
-9223372036854775808 to 9223372036854775807)的长度。


最新的代码已经修复这个问题了[1],你可以等1.11发布后试用,或者编译下最新的代码,flink 中对应表 声明decimal(20, 0)处理。 

祝好,
Leonard Xu

[1]  https://issues.apache.org/jira/browse/FLINK-17657 


> 在 2020年6月11日,13:51,Zhou Zach  写道:
> 
> bigint(20) unsigned