退订

2020-09-09 Thread 程 婕
退订,谢谢




Re: Use of slot sharing groups causing workflow to hang

2020-09-09 Thread Xintong Song
Hi Ken,

I've got a Flink MiniCluster with 12 slots. Even with only 6 pipelined
> operators, each with a parallelism of 1, it still hangs while starting.
>

Could you double check that the minicluster has 12 slots when each or your
operators has only 1 parallelism?

I've looked into the codes. Currently, without any explicit configurations,
minicluster will by default create 1 taskmanager, and the number of slots
on that taskmanager is decided by the max parallelism of your job
operators. That means the number of slots in the minicluster changes
dynamically when the operators' parallelisms change.

By default, all operators are in the same slot sharing groups, thus the
number of slots needed for executing the job is the max parallelism of the
operators. When you separate the operators into more slot sharing groups,
the number of slots needed for job execution increases and you would need
to manually adjust the configurations to provide enough slots. Related
configuration options are `local.number-taskmanager` and
`taskmanager.numberOfTaskSlots`.

Also, please notice that, for local executions (from IDE) the entire flink
application runs in the same process, thus separating the pipeline into
several slot sharing groups will not bring any benefit. If you are just
trying out with the slot sharing groups or preparing for later deploying
the execution to a distributed cluster, then there should be no problem.

Thank you~

Xintong Song



On Thu, Sep 10, 2020 at 11:22 AM Yangze Guo  wrote:

> Hi, Ken
>
> From the RM perspective, could you share the following logs:
> - "Request slot with profile {} for job {} with allocation id {}.".
> - "Requesting new slot [{}] and profile {} with allocation id {} from
> resource manager."
> This will help to figure out how many slots your job indeed requests.
> And probably help to figure out what the ExecutionGraph finally looks
> like.
>
>
> Best,
> Yangze Guo
>
> On Thu, Sep 10, 2020 at 10:47 AM Ken Krugler
>  wrote:
> >
> > Hi Til,
> >
> > On Sep 3, 2020, at 12:31 AM, Till Rohrmann  wrote:
> >
> > Hi Ken,
> >
> > I believe that we don't have a lot if not any explicit logging about the
> slot sharing group in the code. You can, however, learn indirectly about it
> by looking at the required number of AllocatedSlots in the SlotPool. Also
> the number of "multi task slot" which are created should vary because every
> group of slot sharing tasks will create one of them. For learning about the
> SlotPoolImpl's status, you can also take a look at SlotPoolImpl.printStatus.
> >
> > For the underlying problem, I believe that Yangze could be right. How
> many resources do you have in your cluster?
> >
> >
> > I've got a Flink MiniCluster with 12 slots. Even with only 6 pipelined
> > operators, each with a parallelism of 1, it still hangs while starting.
> So
> > I don't think that it's a resource issue.
> >
> > One odd thing I've noticed. I've got three streams that I union together.
> > Two of the streams are in separate slot sharing groups, the third is not
> > assigned to a group. But when I check the logs, I see three "Create multi
> > task slot" entries. I'm wondering if unioning streams that are in
> different
> > slot sharing groups creates a problem.
> >
> > Thanks,
> >
> > -- Ken
> >
> > On Thu, Sep 3, 2020 at 4:25 AM Yangze Guo  wrote:
> >>
> >> Hi,
> >>
> >> The failure of requesting slots usually because of the lack of
> >> resources. If you put part of the workflow to a specific slot sharing
> >> group, it may require more slots to run the workflow than before.
> >> Could you share logs of the ResourceManager and SlotManager, I think
> >> there are more clues in it.
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Thu, Sep 3, 2020 at 4:39 AM Ken Krugler 
> wrote:
> >> >
> >> > Hi all,
> >> >
> >> > I’ve got a streaming workflow (using Flink 1.11.1) that runs fine
> locally (via Eclipse), with a parallelism of either 3 or 6.
> >> >
> >> > If I set up part of the workflow to use a specific (not “default”)
> slot sharing group with a parallelism of 3, and the remaining portions of
> the workflow have a parallelism of either 1 or 2, then the workflow never
> starts running, and eventually fails due to a slot request not being
> fulfilled in time.
> >> >
> >> > So I’m wondering how best to debug this.
> >> >
> >> > I don’t see any information (even at DEBUG level) being logged about
> which operators are in what slot sharing group, or which slots are assigned
> to what groups.
> >> >
> >> > Thanks,
> >> >
> >> > — Ken
> >> >
> >> > PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712,
> and tried the approach of setting # of slots in the config, but that didn’t
> change anything. I see that issue is still open, so wondering what Til and
> Konstantin have to say about it.
> >> >
> >> > --
> >> > Ken Krugler
> >> > http://www.scaleunlimited.com
> >> > custom big data solutions & training
> >> > Hadoop, Cascading, Cassandra & Solr
> >> >
> >

Re: flink 1.11 taskmanager实际占用内存超出配置太多

2020-09-09 Thread Xintong Song
Flink 是无法完全控制所有内存开销的,这是 java 应用程序自身特点决定的。
- 对于 java heap/direct/metaspace 等 JVM 可以控制的内存,Flink 会设置 JVM 参数控制其不能超用
- 对于 Flink 自己维护的固定大小的缓冲池,如 network buffer pool, managed memory 等,Flink 也会
严格限制申请内存的大小。
- 对于其他一些开销,如 JVM 的线程栈、用户代码及第三方依赖中的 native 方法等,Flink
是无法限制这部分内存使用的大小的,只能是根据配置从总内存中预留出一部分来。如果预留的不够多,就回出现内存潮涌的情况

如果出现内存超用,只可能是上述第三部分造成的。建议:
- 确认 'state.backend.rocksdb.memory.managed' 是否配为了 true。默认为 true 时,rocksdb
会根据 managed memory 的大小调整自己的内存用量。如果是 false,则 rocksdb 的内存用量会无视 taskmanager
的内存配置,存在超用的可能。
- 调整 'taskmanager.memory.jvm-overhead.[min|max|fraction]’,让 taskmanager
预留更多的内存。

另外可以参考官方文档中针对内存超用的建议 [1]。

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/memory/mem_trouble.html#%E5%AE%B9%E5%99%A8container%E5%86%85%E5%AD%98%E8%B6%85%E7%94%A8

On Thu, Sep 10, 2020 at 12:54 PM Z-Z  wrote:

> 补充一下,是用的rocksdb做状态存储
>
>
>
>
> --原始邮件--
> 发件人:
>   "Z-Z"
> <
> zz9876543...@qq.com;
> 发送时间:2020年9月10日(星期四) 上午10:08
> 收件人:"user-zh"
> 主题:flink 1.11 taskmanager实际占用内存超出配置太多
>
>
>
> Hi 大家早上好,请问大佬:
> flink docker session模式中,taskmanager的内存配置是
> taskmanager.memory.process.size: 5120m
> taskmanager.memory.jvm-metaspace.size: 1024m
>
> 但在容器外,taskmanager实际占用的内存超过了7.5G,taskmanager启动时打印的内存参数也是对的
> INFO [] - Final TaskExecutor Memory configuration:
> INFO [] - Total Process Memory:  
>   5.000gb (5368709120 bytes)
> INFO [] -  Total Flink Memory:  
>   3.500gb (3758096376 bytes)
> INFO [] -   Total JVM Heap Memory:
>  1.625gb (1744830433 bytes)
> INFO [] -Framework: 
>  128.000mb (134217728 bytes)
> INFO [] -Task:  
>1.500gb (1610612705 bytes)
> INFO [] -   Total Off-heap Memory:
>  1.875gb (2013265943 bytes)
> INFO [] -Managed: 
>   1.400gb (1503238572 bytes)
> INFO [] -Total JVM Direct Memory:
> 486.400mb (510027371 bytes)
> INFO [] - Framework:
>  128.000mb (134217728 bytes)
> INFO [] - Task: 
>0 bytes
> INFO [] - Network:
>   358.400mb (375809643 bytes)
> INFO [] -  JVM Metaspace:  
> 1024.000mb (1073741824 bytes)
> INFO [] -  JVM Overhead:  
>  512.000mb (536870920 bytes)
>
> 使用jdk1.8的jmap进行dump会报错,加-F也不行:
> 1: Unable to open socket file: target process not responding or HotSpot VM
> not loaded
>
>
> 我该怎么分析它内存到底用在哪了呢?


??????flink 1.11 taskmanager????????????????????????

2020-09-09 Thread Z-Z
rocksdb??




----
??: 
   "Z-Z"



Re: Flink Plan Visualizer

2020-09-09 Thread JasonLee
hi

Flink plan visualizer 应该是只能画stream graph stream graph 是一个逻辑上的 DAG 图
你把任务提交到集群上 在 Flink WEB UI 上面就可以看到 job graph 了 stream graph 和 job graph 的区别是
job graph 优化了 operator chain 
job graph 是在调用 env.execute 方法之后才生成的



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Use of slot sharing groups causing workflow to hang

2020-09-09 Thread Yangze Guo
Hi, Ken

>From the RM perspective, could you share the following logs:
- "Request slot with profile {} for job {} with allocation id {}.".
- "Requesting new slot [{}] and profile {} with allocation id {} from
resource manager."
This will help to figure out how many slots your job indeed requests.
And probably help to figure out what the ExecutionGraph finally looks
like.


Best,
Yangze Guo

On Thu, Sep 10, 2020 at 10:47 AM Ken Krugler
 wrote:
>
> Hi Til,
>
> On Sep 3, 2020, at 12:31 AM, Till Rohrmann  wrote:
>
> Hi Ken,
>
> I believe that we don't have a lot if not any explicit logging about the slot 
> sharing group in the code. You can, however, learn indirectly about it by 
> looking at the required number of AllocatedSlots in the SlotPool. Also the 
> number of "multi task slot" which are created should vary because every group 
> of slot sharing tasks will create one of them. For learning about the 
> SlotPoolImpl's status, you can also take a look at SlotPoolImpl.printStatus.
>
> For the underlying problem, I believe that Yangze could be right. How many 
> resources do you have in your cluster?
>
>
> I've got a Flink MiniCluster with 12 slots. Even with only 6 pipelined
> operators, each with a parallelism of 1, it still hangs while starting. So
> I don't think that it's a resource issue.
>
> One odd thing I've noticed. I've got three streams that I union together.
> Two of the streams are in separate slot sharing groups, the third is not
> assigned to a group. But when I check the logs, I see three "Create multi
> task slot" entries. I'm wondering if unioning streams that are in different
> slot sharing groups creates a problem.
>
> Thanks,
>
> -- Ken
>
> On Thu, Sep 3, 2020 at 4:25 AM Yangze Guo  wrote:
>>
>> Hi,
>>
>> The failure of requesting slots usually because of the lack of
>> resources. If you put part of the workflow to a specific slot sharing
>> group, it may require more slots to run the workflow than before.
>> Could you share logs of the ResourceManager and SlotManager, I think
>> there are more clues in it.
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Sep 3, 2020 at 4:39 AM Ken Krugler  
>> wrote:
>> >
>> > Hi all,
>> >
>> > I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally 
>> > (via Eclipse), with a parallelism of either 3 or 6.
>> >
>> > If I set up part of the workflow to use a specific (not “default”) slot 
>> > sharing group with a parallelism of 3, and the remaining portions of the 
>> > workflow have a parallelism of either 1 or 2, then the workflow never 
>> > starts running, and eventually fails due to a slot request not being 
>> > fulfilled in time.
>> >
>> > So I’m wondering how best to debug this.
>> >
>> > I don’t see any information (even at DEBUG level) being logged about which 
>> > operators are in what slot sharing group, or which slots are assigned to 
>> > what groups.
>> >
>> > Thanks,
>> >
>> > — Ken
>> >
>> > PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712, and 
>> > tried the approach of setting # of slots in the config, but that didn’t 
>> > change anything. I see that issue is still open, so wondering what Til and 
>> > Konstantin have to say about it.
>> >
>> > --
>> > Ken Krugler
>> > http://www.scaleunlimited.com
>> > custom big data solutions & training
>> > Hadoop, Cascading, Cassandra & Solr
>> >
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


Re: Flink Plan Visualizer

2020-09-09 Thread zilong xiao
有可以画job graph的方法吗?

黄潇  于2020年9月8日周二 下午8:32写道:

> Hi,
>
> 据我所知,使用 env.getExecutionPlan() 得到的 json 字符串[1]只包含 stream graph
> 的信息,所以这样画出来的图是 stream graph。
> 在 job 提交之后的 web ui 中可以看到经过 operator chain 之后的图信息。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/execution_plans.html
>
> zilong xiao  于2020年9月8日周二 下午8:00写道:
>
> > hi,想问下,Flink Plan Visualizer能画job graph吗?网上查貌似只能根据execution plan画steaming
> > graph?
> >
>


Re: flink sql 1.11.1 insert data to hive from kafka split into two jobs

2020-09-09 Thread Qishang
Hi. 大罗
试一下这个方法 org.apache.flink.table.api.StatementSet#execute
ss.execute();

大罗  于2020年9月9日周三 下午3:13写道:

> Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:
>
> 首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
> :2},
> {"pid":"a", "val":1, "data_type": 1, "app_type" :2}]
>
> 然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1,
> "app_type" :2}
>
> 把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())
>
> 然后,再创建临时表,比如:
> tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator,
> $("pid"),  $("val"), $("app_type"), $("data_type"));
>
> 接着定义不同的sql,比如:
> String sql1 = "insert into ods_data_10 select pid, val where data_type = 1
> and app_type = 0"
> String sql2 = "insert into ods_data_11 select pid, val where data_type = 1
> and app_type = 1"
> String sql3 = "insert into ods_data_01 select pid, val where data_type = 0
> and app_type = 1"
> String sql4 = "insert into ods_data_00 select pid, val where data_type = 0
> and app_type = 0"
>
> 使用StatementSet运行它们:
> StatementSet ss = tableEnv.createStatementSet();
> ss.addInsertSql(sql1);
> ss.addInsertSql(sql2);
> ss.addInsertSql(sql3);
> ss.addInsertSql(sql4);
>
> 最后执行作业:
> env.execute(jobName);
>
> 一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150820%402x.png>
>
>
> 作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),
>
> 作业"insert-into_myhive.dw.ods_analog_sems
> ***"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/QQ20200909-150928%402x.png>
>
>
> 其中,顶端的operator的定义如下:
> Source: Custom Source -> Map -> Flat Map -> Filter ->
> SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
> run_data_type]) ->
> (Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
> _UTF-16LE'BP.%')))]) -> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter,
> Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
> _UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
> AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))])
> ->
> StreamingFileWriter)
>
> 我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m :8081 jobA"
> 会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
> savepoint."
> 相应的停止作业jobB的时候也会生成这个savepoint。
>
> 我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Use of slot sharing groups causing workflow to hang

2020-09-09 Thread Ken Krugler
Hi Til,

> On Sep 3, 2020, at 12:31 AM, Till Rohrmann  wrote:
> 
> Hi Ken,
> 
> I believe that we don't have a lot if not any explicit logging about the slot 
> sharing group in the code. You can, however, learn indirectly about it by 
> looking at the required number of AllocatedSlots in the SlotPool. Also the 
> number of "multi task slot" which are created should vary because every group 
> of slot sharing tasks will create one of them. For learning about the 
> SlotPoolImpl's status, you can also take a look at SlotPoolImpl.printStatus.
> 
> For the underlying problem, I believe that Yangze could be right. How many 
> resources do you have in your cluster?

I've got a Flink MiniCluster with 12 slots. Even with only 6 pipelined
operators, each with a parallelism of 1, it still hangs while starting. So
I don't think that it's a resource issue.

One odd thing I've noticed. I've got three streams that I union together.
Two of the streams are in separate slot sharing groups, the third is not
assigned to a group. But when I check the logs, I see three "Create multi
task slot" entries. I'm wondering if unioning streams that are in different
slot sharing groups creates a problem.

Thanks,

-- Ken

> On Thu, Sep 3, 2020 at 4:25 AM Yangze Guo  > wrote:
> Hi,
> 
> The failure of requesting slots usually because of the lack of
> resources. If you put part of the workflow to a specific slot sharing
> group, it may require more slots to run the workflow than before.
> Could you share logs of the ResourceManager and SlotManager, I think
> there are more clues in it.
> 
> Best,
> Yangze Guo
> 
> On Thu, Sep 3, 2020 at 4:39 AM Ken Krugler  > wrote:
> >
> > Hi all,
> >
> > I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally 
> > (via Eclipse), with a parallelism of either 3 or 6.
> >
> > If I set up part of the workflow to use a specific (not “default”) slot 
> > sharing group with a parallelism of 3, and the remaining portions of the 
> > workflow have a parallelism of either 1 or 2, then the workflow never 
> > starts running, and eventually fails due to a slot request not being 
> > fulfilled in time.
> >
> > So I’m wondering how best to debug this.
> >
> > I don’t see any information (even at DEBUG level) being logged about which 
> > operators are in what slot sharing group, or which slots are assigned to 
> > what groups.
> >
> > Thanks,
> >
> > — Ken
> >
> > PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712 
> > , and tried the approach 
> > of setting # of slots in the config, but that didn’t change anything. I see 
> > that issue is still open, so wondering what Til and Konstantin have to say 
> > about it.
> >
> > --
> > Ken Krugler
> > http://www.scaleunlimited.com 
> > custom big data solutions & training
> > Hadoop, Cascading, Cassandra & Solr
> >

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Flink 1.5.0 savepoint 失败

2020-09-09 Thread likai
hi all. 我使用flink 1.5.0 在触发 savepoint失败。
共享目录:/data/emr_flink_savepoint_share/
触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0 
file:///data/emr_flink_savepoint_share 
Hadoop conf 默认文件系统是  hdfs://flink-hdfs
报错:
Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for 
operator Source: KafkaJSONStringTableSource -> Map -> where: (OR(=(e, 
_UTF-16LE'INSERT'), =(e, _UTF-16LE'DELETE'), =(e, _UTF-16LE'UPDATE'))), select: 
(CAST(get_json_object(data, _UTF-16LE'pid')) AS EXPR$0, 
CAST(get_json_object(data, _UTF-16LE'tag_id')) AS EXPR$1, 
CAST(get_json_object(data, _UTF-16LE'tag_type')) AS EXPR$2, 
get_json_object(data, _UTF-16LE'tag_name') AS EXPR$3, 
CAST(get_json_object(data, _UTF-16LE'tag_version')) AS EXPR$4, 
CAST(get_json_object(data, _UTF-16LE'att_type')) AS EXPR$5, 
CAST(get_json_object(data, _UTF-16LE'is_del')) AS EXPR$6, e) -> to: Tuple2 -> 
Sink: Unnamed (1/1).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
... 5 more
Caused by: java.io.IOException: Could not open output stream for state backend
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:360)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:161)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoWriterV2.writeOperatorStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:142)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:77)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:411)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: java.lang.IllegalArgumentException: Wrong FS: 
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
 expected: hdfs://flink-hdfs
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:140)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:36)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
... 18 more
在本地测试的时候。1.5.0 是可以触发 savepoint 

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 Thread Benchao Li
1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
如果还有自己的binlog格式,也可以自定义format来实现。

只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为
1. append / update_after 消息会累加到聚合指标上
2. delete / update_before 消息会从聚合指标上进行retract


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html

忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:

> 请问第1点是有实际的案例使用了么?
> 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> 谢谢.
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年9月9日(星期三) 中午1:09
> 收件人:"user-zh"
> 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
>
>
>
> 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> 1. 首先版本是1.11+, 可以直接用binlog
> format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
>  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> yyy这种,那这个sum指标会自动做好这件事。
> 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的
>  聚合逻辑,效果也是一样的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
>
> xuzh 
>  场景:
>  nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。
>  nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
>  nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000.
>  nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
>  这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp;
>  请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> 
> 
>  nbsp; 刚入坑实时处理,请大神赐教
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


flink 1.11 taskmanager????????????????????????

2020-09-09 Thread Z-Z
Hi ??
flink docker sessiontaskmanager
taskmanager.memory.process.size: 5120m
taskmanager.memory.jvm-metaspace.size: 1024m

taskmanager7.5G??taskmanager
INFO [] - Final TaskExecutor Memory configuration:
INFO [] - Total Process Memory:
 5.000gb (5368709120 bytes)
INFO [] -  Total Flink Memory:   
  3.500gb (3758096376 bytes)
INFO [] -   Total JVM Heap Memory:  
1.625gb (1744830433 bytes)
INFO [] -Framework:   
128.000mb (134217728 bytes)
INFO [] -Task:   
   1.500gb (1610612705 bytes)
INFO [] -   Total Off-heap Memory:  
1.875gb (2013265943 bytes)
INFO [] -Managed:   
 1.400gb (1503238572 bytes)
INFO [] -Total JVM Direct Memory: 
486.400mb (510027371 bytes)
INFO [] - Framework:  
128.000mb (134217728 bytes)
INFO [] - Task:  
   0 bytes
INFO [] - Network:  
 358.400mb (375809643 bytes)
INFO [] -  JVM Metaspace:
   1024.000mb (1073741824 bytes)
INFO [] -  JVM Overhead:
512.000mb (536870920 bytes)

jdk1.8??jmapdump??-F
1: Unable to open socket file: target process not responding or HotSpot VM not 
loaded


??

?????? flink-sql????????on kafka??flink table??????select????flink table????????????group id??????

2020-09-09 Thread ??????
CREATE TABLE ODS_PaymentOrdert (
  orderId INT,
  memberId INT,
orderAmount DECIMAL(10, 2),
paymentStatus SMALLINT,
orderDate VARCHAR,
payDate VARCHAR,
paymentIP VARCHAR,
orderSrc VARCHAR,
channelType SMALLINT,
productId SMALLINT,
amount SMALLINT,
unit VARCHAR,
paymentChannel SMALLINT,
serviceOrderType SMALLINT,
refundAmount DECIMAL(10, 2),
proctime as PROCTIME(),
primary key(orderId) NOT ENFORCED
) WITH (
 'connector' = 'kafka',
 'properties.group.id' = 'flink-sql',
 'properties.bootstrap.servers' = 'xx.xx.xx.xxx:9092',
 'topic' = 'ODS_PaymentOrdert',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'canal-json');


kafka table option??


checkpoint??


----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#%E9%85%8D%E7%BD%AE-kafka-consumer-%E5%BC%80%E5%A7%8B%E6%B6%88%E8%B4%B9%E7%9A%84%E4%BD%8D%E7%BD%AE
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#kafka-consumer-%E6%8F%90%E4%BA%A4-offset-%E7%9A%84%E8%A1%8C%E4%B8%BA%E9%85%8D%E7%BD%AE

 ?? 2020??9??916:24 <466792...@qq.com ??
 
 ??kafka??flink 
table??select??selectgroup
 id??

Re: 退订

2020-09-09 Thread Xintong Song
你好,

退订需要发邮件到 user-zh-unsubscr...@flink.apache.org


Thank you~

Xintong Song



On Thu, Sep 10, 2020 at 10:03 AM 邢明浩  wrote:

> 退订


Re: 退订

2020-09-09 Thread Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

邢明浩  于2020年9月10日周四 上午10:03写道:

> 退订


退订

2020-09-09 Thread 邢明浩
退订

??????flink????spring

2020-09-09 Thread ??????
flinkjar


| |
??
|
|
??15927482...@163.com
|

??  

??2020??09??10?? 09:09??1115098...@qq.com ??
??spring 
boot??flink??spring 
boot??flink??spring boot

退订

2020-09-09 Thread 邢明浩
退订,谢谢

Re: flink集成spring

2020-09-09 Thread Jeff Zhang
可以看看这个zeppelin sdk,,也许适合你
https://www.yuque.com/jeffzhangjianfeng/gldg8w/pz2xoh


1115098...@qq.com  于2020年9月10日周四 上午9:09写道:

> 大家好,我在将spring boot集成到flink的过程中,遇到很多问题,感觉不太兼容。看官方文档也没有集成spring
> boot的介绍,是不是flink设计的时候就没有考虑与spring boot的集成?



-- 
Best Regards

Jeff Zhang


回复: Flink 1.11 jdbc查pg失败

2020-09-09 Thread wdmcode
Hi Jimmy

给字段加双引号试试呢
Select “F1”,”F2” from xxx.xxx;


发件人: Jimmy Zhang
发送时间: Thursday, September 10, 2020 9:41 AM
收件人: user-zh@flink.apache.org
主题: Flink 1.11 jdbc查pg失败

flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗

Best,
Jimmy 
Signature is customized by Netease Mail Master



Flink 1.11 jdbc查pg失败

2020-09-09 Thread Jimmy Zhang
flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗



|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 Thread lec ssmi
直接根据订单的id进行retract(使用last_value  group by id
),然后sum就可以了吧。只要你设置的状态保存期是的大于你订单金额的冷却时间就行。

忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:

> 请问第1点是有实际的案例使用了么?
> 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> 谢谢.
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年9月9日(星期三) 中午1:09
> 收件人:"user-zh"
> 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
>
>
>
> 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> 1. 首先版本是1.11+, 可以直接用binlog
> format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
>  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> yyy这种,那这个sum指标会自动做好这件事。
> 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的
>  聚合逻辑,效果也是一样的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
>
> xuzh 
>  场景:
>  nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。
>  nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
>  nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000.
>  nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
>  这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp;
>  请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> 
> 
>  nbsp; 刚入坑实时处理,请大神赐教
>
>
>
> --
>
> Best,
> Benchao Li


退订

2020-09-09 Thread harveyer
退订, 谢谢

flink????spring

2020-09-09 Thread 1115098...@qq.com
??spring 
boot??flink??spring 
boot??flink??spring boot

Re: Slow Performance inquiry

2020-09-09 Thread David Anderson
Heidy, which state backend are you using? With RocksDB Flink will have to
do ser/de on every access and update, but with the FsStateBackend, your
sparse matrix will sit in memory, and only have to be serialized during
checkpointing.

David

On Wed, Sep 9, 2020 at 2:41 PM Heidi Hazem Mohamed 
wrote:

> Hi Walther,
>
> Many thanks for your answer, I declared the state type as below
>
> ValueStateDescriptor descriptor =
> new ValueStateDescriptor(
>"Rating Matrix",
>TypeInformation.of(new TypeHint() {
>}
>));
>
>
> Is there a better way?
>
> Regards,
>
> Heidy
> --
> *From:* Timo Walther 
> *Sent:* Wednesday, September 9, 2020 1:58 PM
> *To:* user@flink.apache.org 
> *Subject:* Re: Slow Performance inquiry
>
> Hi Hazem,
>
> I guess your performance is mostly driven by the serialization overhead
> in this case. How do you declare your state type?
>
> Flink comes with different serializers. Not all of them are extracted
> automatically when using reflective extraction methods:
>
> -  Note that `Serializable` declaration has no effect for Flink, other
> than NOT using Flink's efficient serializers.
> - Flink's POJO serializer only works with a default constructor present.
> - Row needs to explicit declaration of fields.
>
> Regards,
> Timo
>
>
> On 09.09.20 13:08, Heidi Hazem Mohamed wrote:
> > Dear,
> >
> > I am writing a Flink program(Recommender system) needed a matrix as a
> > state which is the rating matrix, While the matrix is very sparse, I
> > implemented a sparse binary matrix to save the memory and save only the
> > ones, not all the matrix and use it as a data type and save it in a
> > value State but unexpectedly the performance became terrible and the job
> > became very slow, I wonder any suggestion to know what is the problem?
> >
> > My first implementation for the rating matrix state :
> >
> > MapState>ratingMatrix;
> >
> >
> > The second implementation (the slow one) for rating matrix state:
> >
> > ValueStateuserItemRatingHistory;
> >
> >
> > and this apart from sparseBinaryMatrix class
> >
> > public class SparseBinaryMatriximplements Serializable {
> >
> >  private ArrayListcontent;
> >
> > private int rowLength;
> >
> > private HashMapcolumnLabels;
> > private HashMapinverseColumnLabels;
> >
> > private HashMaprowLabels;
> > private HashMapinverseRowLabels;
> >
> > private enum LabelerType{Row, Column};
> >
> > public IntegercolNumber;
> > public IntegerrowNumber;
> >
> >
> > // This constructor initializes the matrix with zeros
> > public SparseBinaryMatrix(int rows, int columns)
> >  {
> >  content =new ArrayList<>(rows);
> > rowLength = columns;
> > // for (int i = 0; i < rows; i++)
> > // content.add(new Row(columns));
> >
> >
> > }
> >
> >
> >
> > Is depending on other class (Row) may lead to this terrible performance
> > while Row is class I have implemented and this is part of it
> >
> > public class Rowimplements Serializable {
> >  //This is an alternating sorted array
> > private ArrayListcontent;
> > private int length=0;
> >
> > public Row (int numbColumns)
> >  {
> >  length = numbColumns;
> > for (int i =0; i < numbColumns;i++)
> >  setColumnToZero(i);
> > }
> >
> >  public Row (int[] initialValues )
> >  {
> >  length = initialValues.length;
> > content =new ArrayList<>(length);
> > for (int i =0; i  >  setColumn(i, initialValues[i]);
> > }
> >
> >
> > Regards,
> >
> > Heidy
> >
>
>


Re: Difficulties with Minio state storage

2020-09-09 Thread Rex Fenley
Good news!

Eliminating
bsEnv.setStateBackend(
  new RocksDBStateBackend(
"s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
true
  )
)
moving all configuration into FLINK_PROPERTIES and switching to http seemed
to do the trick!

Thanks for all the help!



On Wed, Sep 9, 2020 at 9:45 AM Rex Fenley  wrote:

> Thanks yall,
>
> Yangze,
> > I've tried to use MinIO as state backend and everything seems works well
> For clarity, I'm using RocksDB state backend with Minio as state storage.
> > s3.endpoint: http://localhost:9000
> Also, I'm doing everything from docker-compose so localhost isn't going to
> work in my case.
>
>
> Arvid,
> > You definitely need to use an http endpoint.
> I always receive errors like the following when I use http:
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'http'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems, please
> see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
> Whereas s3:// gives me Bad Request errors instead
>
> Thanks
>
>
> On Wed, Sep 9, 2020 at 8:03 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> you could also check the end to end tests that use minio in flink's repo.
>> You definitely need to use an http endpoint.
>>
>> The setup [1] uses also another way to specify the s3.path.style.access
>> (with dashes). I think we needed it especially for presto. It seems like
>> the settings differ a bit across the implementations, so give it a try. It
>> might also be something that we should translate.
>> For reference, the actual test using presto can be found here [2].
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/common_s3_minio.sh#L115
>> [2]
>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh#L64
>>
>> On Wed, Sep 9, 2020 at 4:17 AM Yangze Guo  wrote:
>>
>>> Hi, Rex,
>>>
>>> I've tried to use MinIO as state backend and everything seems works well.
>>> Just sharing my configuration:
>>> ```
>>> s3.access-key:
>>> s3.secret-key:
>>> s3.endpoint: http://localhost:9000
>>> s3.path.style.access: true
>>> state.checkpoints.dir: s3://flink/checkpoints
>>> ```
>>>
>>> I think the problem might be caused by the following reasons:
>>> - The MinIO is not well configured.
>>> - Maybe you need to create a bucket for it first. In my case, I create
>>> a bucket called "flink" first.
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley  wrote:
>>> >
>>> > Hello!
>>> >
>>> > I'm trying to test out Minio as state storage backend using
>>> docker-compose on my local machine but keep running into errors that seem
>>> strange to me. Any help would be much appreciated :)
>>> >
>>> > The problem:
>>> > With the following environment:
>>> >
>>> > environment:
>>> > - |
>>> > FLINK_PROPERTIES=
>>> > jobmanager.rpc.address: flink-jobmanager
>>> > parallelism.default: 2
>>> > s3.access-key: 
>>> > s3.secret-key: 
>>> > s3.path.style.access: true
>>> >
>>> > And the following State Backend (with flink-jdbc-test_graph-minio_1
>>> being the container serving minio):
>>> >
>>> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>> > bsEnv.setStateBackend(
>>> > new RocksDBStateBackend(
>>> > "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
>>> > true
>>> > )
>>> > )
>>> >
>>> > And submitting the flink job and saving from another docker container
>>> like so:
>>> >
>>> > flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c >> Name> .jar
>>> >
>>> > flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 
>>> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
>>> >
>>> > I end up with the following error:
>>> >
>>> > Caused by:
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>>> com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service:
>>> Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID:
>>> A7E3BB7EEFB524FD; S3 Extended Request ID:
>>> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=),
>>> S3 Extended Request ID:
>>> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=
>>> (Path:
>>> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
>>> > at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
>>> > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>>> > at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>>> > at
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>>> > at 

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-09-09 Thread David Anderson
Arti,

The problem with watermarks and the File source operator will be fixed in
1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new
WatermarkStrategy api.

[1] https://issues.apache.org/jira/browse/FLINK-19109

David

On Wed, Sep 9, 2020 at 2:52 PM Arti Pande  wrote:

> Hi Aljoscha,
>
> By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
> 1.11.1 when using File source the source operator (guessing split
> enumerator or metadata reader) finishes immediately after starting (and
> assigning the splits to split readers) hence when first checkpoint is
> triggered, it sees the state of the first operator i.e. source as finished
> and hence does not do any checkpointing. Thats' what you can see in logs
> and also on the Flink UI for checkpoints. It assumes that the pipeline is
> about to finish shortly and aborts the checkpoint.
>
> This along with the watermark generation problems kind of make it
> difficult to use file source in production.
>
>
> On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek 
> wrote:
>
>> Hi Arti,
>>
>> what exactly do you mean by "checkpoints do not work"? Are there
>> exceptions being thrown? How are you writing your file-based sources,
>> what API methods are you using?
>>
>> Best,
>> Aljoscha
>>
>> On 20.08.20 16:21, Arti Pande wrote:
>> > Hi Till,
>> >
>> > Thank you for your quick response. Both the
>> AssignerWithPeriodicWatermarks
>> > and WatermarkStrategy I am using are very simple ones.
>> >
>> > *Code for AssignerWithPeriodicWatermarks:*
>> >
>> > public class CustomEventTimeWatermarkGenerator implements
>> > AssignerWithPeriodicWatermarks {
>> >
>> >  private final long maxOutOfOrderness = 0;
>> >  private long currentMaxTimestamp;
>> >
>> >  @Override
>> >  public long extractTimestamp(MyPojo myPojo, long
>> previousTimestamp) {
>> >  long timestamp = myPojo.getInitiationTime().toEpochMilli();
>> >  currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>> >  return timestamp;
>> >  }
>> >
>> >  @Override
>> >  public Watermark getCurrentWatermark() {
>> >  return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>> >  }
>> > }
>> >
>> >
>> > *Code for WatermarkStrategy :*
>> >
>> > WatermarkStrategy watermarkStrategy =
>> >
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(0))
>> >  .withTimestampAssigner((event, timestamp) ->
>> > event.getInitiationTime().toEpochMilli());
>> >
>> >
>> > Thanks & regards,
>> > Arti
>> >
>> >
>> > On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann 
>> wrote:
>> >
>> >> Hi Arti,
>> >>
>> >> thanks for sharing this feedback with us. The WatermarkStrategy has
>> been
>> >> introduced quite recently and might have some rough edges. I am
>> pulling in
>> >> Aljoscha and Klou who have worked on this feature and might be able to
>> help
>> >> you. For better understanding your problem, it would be great if you
>> could
>> >> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with
>> us.
>> >>
>> >> For the file source, the Flink community has recently introduced a new
>> >> source abstraction which will also support checkpoints for file sources
>> >> once the file source connector has been migrated to the new
>> interfaces. The
>> >> community is currently working on it.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande 
>> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>> >>> the watermark generation has issues with file source alone. It works
>> well
>> >>> with Kafka source.
>> >>>
>> >>> With 1.9.2 a custom watermark generator implementation of
>> >>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>> >>> deprecated and to be replaced with WatermarkStrategy (that combines
>> both
>> >>> WatermarkGenerator and TimestampAssigner).
>> >>>
>> >>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>> >>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>> >>> perfectly well but with file source none of them works. The watermark
>> >>> assigner never increments the watermarks resulting in stateful
>> operators
>> >>> not clearing their state ever, leading to erroneous results and
>> >>> continuously increasing memory usage.
>> >>>
>> >>> Same code works well with Kafka source. Is this a known issue? If so,
>> any
>> >>> fix planned shortly?
>> >>>
>> >>> A side note (and probably a candidate for separate email, but I will
>> >>> write it here) even checkpoints do not work with File Source since
>> 1.9.2
>> >>> and it is still the problem with 1.11.1. Just wondering if File
>> source with
>> >>> stream API is not a priority in Flink development? If so we can
>> rethink our
>> >>> sources.
>> >>>
>> >>> Thanks & regards,
>> >>> Arti
>> >>>
>> >>
>> >
>>
>>


Re: Difficulties with Minio state storage

2020-09-09 Thread Rex Fenley
Thanks yall,

Yangze,
> I've tried to use MinIO as state backend and everything seems works well
For clarity, I'm using RocksDB state backend with Minio as state storage.
> s3.endpoint: http://localhost:9000
Also, I'm doing everything from docker-compose so localhost isn't going to
work in my case.


Arvid,
> You definitely need to use an http endpoint.
I always receive errors like the following when I use http:
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'http'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded. For a full list of supported file systems, please
see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
Whereas s3:// gives me Bad Request errors instead

Thanks


On Wed, Sep 9, 2020 at 8:03 AM Arvid Heise  wrote:

> Hi Rex,
>
> you could also check the end to end tests that use minio in flink's repo.
> You definitely need to use an http endpoint.
>
> The setup [1] uses also another way to specify the s3.path.style.access
> (with dashes). I think we needed it especially for presto. It seems like
> the settings differ a bit across the implementations, so give it a try. It
> might also be something that we should translate.
> For reference, the actual test using presto can be found here [2].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/common_s3_minio.sh#L115
> [2]
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh#L64
>
> On Wed, Sep 9, 2020 at 4:17 AM Yangze Guo  wrote:
>
>> Hi, Rex,
>>
>> I've tried to use MinIO as state backend and everything seems works well.
>> Just sharing my configuration:
>> ```
>> s3.access-key:
>> s3.secret-key:
>> s3.endpoint: http://localhost:9000
>> s3.path.style.access: true
>> state.checkpoints.dir: s3://flink/checkpoints
>> ```
>>
>> I think the problem might be caused by the following reasons:
>> - The MinIO is not well configured.
>> - Maybe you need to create a bucket for it first. In my case, I create
>> a bucket called "flink" first.
>>
>> Best,
>> Yangze Guo
>>
>> On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley  wrote:
>> >
>> > Hello!
>> >
>> > I'm trying to test out Minio as state storage backend using
>> docker-compose on my local machine but keep running into errors that seem
>> strange to me. Any help would be much appreciated :)
>> >
>> > The problem:
>> > With the following environment:
>> >
>> > environment:
>> > - |
>> > FLINK_PROPERTIES=
>> > jobmanager.rpc.address: flink-jobmanager
>> > parallelism.default: 2
>> > s3.access-key: 
>> > s3.secret-key: 
>> > s3.path.style.access: true
>> >
>> > And the following State Backend (with flink-jdbc-test_graph-minio_1
>> being the container serving minio):
>> >
>> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> > bsEnv.setStateBackend(
>> > new RocksDBStateBackend(
>> > "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
>> > true
>> > )
>> > )
>> >
>> > And submitting the flink job and saving from another docker container
>> like so:
>> >
>> > flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c > Name> .jar
>> >
>> > flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 
>> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
>> >
>> > I end up with the following error:
>> >
>> > Caused by:
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>> com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service:
>> Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID:
>> A7E3BB7EEFB524FD; S3 Extended Request ID:
>> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=),
>> S3 Extended Request ID:
>> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=
>> (Path:
>> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
>> > at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
>> > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>> > at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>> > at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>> > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
>> > at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
>> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
>> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
>> > at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
>> > at
>> 

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Seth Wiesman
Generally +1

The one use case I've seen of union state I've seen in production (outside
of sources and sinks) is as a "poor mans" broadcast state. This was
obviously before that feature was added which is now a few years ago so I
don't know if those pipelines still exist. FWIW, if they do the state
processor api can provide a migration path as it supports rewriting union
state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise  wrote:

> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek 
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Arvid Heise
+1 to getting rid of non-keyed state as is in general and for union state
in particular. I had a hard time to wrap my head around the semantics of
non-keyed state when designing the rescale of unaligned checkpoint.

The only plausible use cases are legacy source and sinks. Both should also
be reworked in deprecated.

My main question is how to represent state in these two cases. For sources,
state should probably be bound to splits. In that regard, split (id) may
act as a key. More generally, there should be probably a concept that
supersedes keys and includes splits.

For sinks, I can see two cases:
- Either we are in a keyed context, then state should be bound to the key.
- Or we are in a non-keyed context, then state might be bound to the split
(?) in case of a source->sink chaining.
- Maybe it should also be a new(?) concept like output partition.

It's not clear to me if there are more cases and if we can always find a
good way to bind state to some sort of key, especially for arbitrary
communication patterns (which we may need to replace as well potentially).

On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek  wrote:

> Hi Devs,
>
> @Users: I'm cc'ing the user ML to see if there are any users that are
> relying on this feature. Please comment here if that is the case.
>
> I'd like to discuss the deprecation and eventual removal of UnionList
> Operator State, aka Operator State with Union Redistribution. If you
> don't know what I'm talking about you can take a look in the
> documentation: [1]. It's not documented thoroughly because it started
> out as mostly an internal feature.
>
> The immediate main reason for removing this is also mentioned in the
> documentation: "Do not use this feature if your list may have high
> cardinality. Checkpoint metadata will store an offset to each list
> entry, which could lead to RPC framesize or out-of-memory errors." The
> insidious part of this limitation is that you will only notice that
> there is a problem when it is too late. Checkpointing will still work
> and a program can continue when the state size is too big. The system
> will only fail when trying to restore from a snapshot that has union
> state that is too big. This could be fixed by working around that issue
> but I think there are more long-term issues with this type of state.
>
> I think we need to deprecate and remove API for state that is not tied
> to a key. Keyed state is easy to reason about, the system can
> re-partition state and also re-partition records and therefore scale the
> system in and out. Operator state, on the other hand is not tied to a
> key but an operator. This is a more "physical" concept, if you will,
> that potentially ties business logic closer to the underlying runtime
> execution model, which in turns means less degrees of freedom for the
> framework, that is Flink. This is future work, though, but we should
> start with deprecating union list state because it is the potentially
> most dangerous type of state.
>
> We currently use this state type internally in at least the
> StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> we're in the process of hopefully getting rid of it there with our work
> on sources and sinks. Before we fully remove it, we should of course
> signal this to users by deprecating it.
>
> What do you think?
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Difficulties with Minio state storage

2020-09-09 Thread Arvid Heise
Hi Rex,

you could also check the end to end tests that use minio in flink's repo.
You definitely need to use an http endpoint.

The setup [1] uses also another way to specify the s3.path.style.access
(with dashes). I think we needed it especially for presto. It seems like
the settings differ a bit across the implementations, so give it a try. It
might also be something that we should translate.
For reference, the actual test using presto can be found here [2].

[1]
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/common_s3_minio.sh#L115
[2]
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh#L64

On Wed, Sep 9, 2020 at 4:17 AM Yangze Guo  wrote:

> Hi, Rex,
>
> I've tried to use MinIO as state backend and everything seems works well.
> Just sharing my configuration:
> ```
> s3.access-key:
> s3.secret-key:
> s3.endpoint: http://localhost:9000
> s3.path.style.access: true
> state.checkpoints.dir: s3://flink/checkpoints
> ```
>
> I think the problem might be caused by the following reasons:
> - The MinIO is not well configured.
> - Maybe you need to create a bucket for it first. In my case, I create
> a bucket called "flink" first.
>
> Best,
> Yangze Guo
>
> On Wed, Sep 9, 2020 at 9:33 AM Rex Fenley  wrote:
> >
> > Hello!
> >
> > I'm trying to test out Minio as state storage backend using
> docker-compose on my local machine but keep running into errors that seem
> strange to me. Any help would be much appreciated :)
> >
> > The problem:
> > With the following environment:
> >
> > environment:
> > - |
> > FLINK_PROPERTIES=
> > jobmanager.rpc.address: flink-jobmanager
> > parallelism.default: 2
> > s3.access-key: 
> > s3.secret-key: 
> > s3.path.style.access: true
> >
> > And the following State Backend (with flink-jdbc-test_graph-minio_1
> being the container serving minio):
> >
> > val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > bsEnv.setStateBackend(
> > new RocksDBStateBackend(
> > "s3://flink-jdbc-test_graph-minio_1/data/checkpoints:9000",
> > true
> > )
> > )
> >
> > And submitting the flink job and saving from another docker container
> like so:
> >
> > flink run -m flink-jdbc-test_flink-jobmanager_1:8081 -c 
> .jar
> >
> > flink savepoint -m flink-jdbc-test_flink-jobmanager_1:8081 
> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints
> >
> > I end up with the following error:
> >
> > Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service:
> Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID:
> A7E3BB7EEFB524FD; S3 Extended Request ID:
> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=),
> S3 Extended Request ID:
> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=
> (Path:
> s3://flink-jdbc-test_graph-minio_1:9000/data/savepoints/savepoint-5c4090-5f90e0cdc603/_metadata)
> > at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
> > at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> > at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
> > at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
> > at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:141)
> > at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> > at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:169)
> > at
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
> > at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109)
> > at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:306)
> > ... 10 more
> > Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad
> Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request;
> Request ID: A7E3BB7EEFB524FD; S3 Extended Request ID:
> cJOtc6E3Kb+U5hgbkA+09Dd/ouDHBGL2ftb1pGHpIwFgd6tE461nkaDtjOj40zbWEpFAcMOEmbY=)
> > at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
> >
> > If I add to the environment to include:
> > ...
> > s3.endpoint: s3://flink-jdbc-test_graph-minio_1:9000
> > ...
> >

?????? flink????????GMV,??????????????????????????????

2020-09-09 Thread ????????
??1?
??1.11+??sqlupdate_beforeupdate_after?
???Flinkafter??before??
.




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication


xuzh 

[DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Aljoscha Krettek

Hi Devs,

@Users: I'm cc'ing the user ML to see if there are any users that are 
relying on this feature. Please comment here if that is the case.


I'd like to discuss the deprecation and eventual removal of UnionList 
Operator State, aka Operator State with Union Redistribution. If you 
don't know what I'm talking about you can take a look in the 
documentation: [1]. It's not documented thoroughly because it started 
out as mostly an internal feature.


The immediate main reason for removing this is also mentioned in the 
documentation: "Do not use this feature if your list may have high 
cardinality. Checkpoint metadata will store an offset to each list 
entry, which could lead to RPC framesize or out-of-memory errors." The 
insidious part of this limitation is that you will only notice that 
there is a problem when it is too late. Checkpointing will still work 
and a program can continue when the state size is too big. The system 
will only fail when trying to restore from a snapshot that has union 
state that is too big. This could be fixed by working around that issue 
but I think there are more long-term issues with this type of state.


I think we need to deprecate and remove API for state that is not tied 
to a key. Keyed state is easy to reason about, the system can 
re-partition state and also re-partition records and therefore scale the 
system in and out. Operator state, on the other hand is not tied to a 
key but an operator. This is a more "physical" concept, if you will, 
that potentially ties business logic closer to the underlying runtime 
execution model, which in turns means less degrees of freedom for the 
framework, that is Flink. This is future work, though, but we should 
start with deprecating union list state because it is the potentially 
most dangerous type of state.


We currently use this state type internally in at least the 
StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However, 
we're in the process of hopefully getting rid of it there with our work 
on sources and sinks. Before we fully remove it, we should of course 
signal this to users by deprecating it.


What do you think?

Best,
Aljoscha


Re: How to get Latency Tracking results?

2020-09-09 Thread David Anderson
Pankaj,

I just checked, and the latency metrics for SocketWindowWordCount show up
just fine for me with Flink 1.11.1. For me, the latency metrics existed
even before I provided any data on the socket for the job to process. This
makes sense, as the latency tracking markers will propagate through the job
graph and provide measurements despite the lack of any real data for the
job to process.

These metrics have horrible names, but for me,

curl -s http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics

reveals a bunch of latency metrics, and, for example,

curl -s
http://localhost:8081/jobs/69f954cb93c680fb2b210f877c377131/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999

returns

[{"id":"latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.17fbfcaabad45985bbdf4da0490487e3.operator_subtask_index.0.latency_p999","value":"105.0"}]

Best,
David


On Wed, Sep 9, 2020 at 3:16 PM Pankaj Chand 
wrote:

> Hi David,
>
> Thanks for replying! Sorry, I forgot to mention I am using Flink Version:
> 1.11.1, Commit ID: 7eb514a.
> Is it possible that the default SocketWindowWordCount job is too simple to
> generate Latency metrics? Or that the latency metrics disappear from the
> output JSON when the data ingestion is zero?
>
> Thanks,
>
> Pankaj
>
>
> On Wed, Sep 9, 2020 at 6:27 AM David Anderson 
> wrote:
>
>> Pankaj,
>>
>> The Flink web UI doesn't do any visualizations of histogram metrics, so
>> the only way to access the latency metrics is either through the REST api
>> or a metrics reporter.
>>
>> The REST endpoint you tried is the correct place to find these metrics in
>> all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6
>> (if I recall correctly) these metrics were task metrics. So if you are
>> using an older version of Flink you'll need to dig deeper. I believe you'll
>> find them in
>>
>> /jobs//vertices//subtasks/metrics
>>
>> Regards,
>> David
>>
>>
>>
>> On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand 
>> wrote:
>>
>>> Hello,
>>>
>>> How do I visualize (or extract) the results for Latency Tracking for a
>>> Flink local cluster? I set "metrics.latency.interval 100" in the
>>> conf/flink-conf.yaml file, and started the cluster and
>>> SocketWindowWordCount job. However, I could not find the latency
>>> distributions anywhere in the web UI, nor are there any latency metrics in
>>> the Metrics dropdown box for either task.
>>>
>>> I also set "metrics.latency.granularity "operator"" in
>>> conf/flink-conf.yaml, but that did not help.
>>>
>>> When I tried to query the REST endpoint, I got the following output
>>> which did not seem to contain anything related to latency:
>>>
>>> $ curl -s
>>> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics
>>>
>>>
>>> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}]
>>>
>>> Thanks,
>>>
>>> Pankaj
>>>
>>


Re: PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-09 Thread Manas Kale
Hi Xingbo and Till,
Thank you for your help!

On Wed, Sep 2, 2020 at 9:38 PM Xingbo Huang  wrote:

> Hi Manas,
>
> As Till said, you need to check whether the execution environment used is
> LocalStreamEnvironment. You need to get the class object corresponding to
> the corresponding java object through py4j. You can take a look at the
> example I wrote below, I hope it will help you
>
> ```
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> from py4j.java_gateway import get_java_class
>
>
> def test():
> env = StreamExecutionEnvironment.get_execution_environment()
> table_env = StreamTableEnvironment.create(
> env, environment_settings=EnvironmentSettings.new_instance()
> .in_streaming_mode().use_blink_planner().build())
> gateway = get_gateway()
>
> # get the execution environment class
> env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass()
>
> # get the LocalStreamEnvironment class
> local_stream_environment_class = get_java_class(
>
> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment)
> print(env_class == local_stream_environment_class)
>
>
> if __name__ == '__main__':
> test()
>
> ```
>
>
> Best,
> Xingbo
>
> Till Rohrmann  于2020年9月2日周三 下午5:03写道:
>
>> Hi Manas,
>>
>> I am not entirely sure but you might try to check whether
>> env._j_stream_execution_environment is an instance of
>> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment
>> via Python's isinstance function.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 2, 2020 at 5:46 AM Manas Kale  wrote:
>>
>>> Hi Xingbo,
>>> Thank you for clarifying that. I am indeed maintaining a different
>>> version of the code by commenting those lines, but I was just wondering if
>>> it was possible to detect the environment programmatically.
>>>
>>> Regards,
>>> Manas
>>>
>>> On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang  wrote:
>>>
 Hi Manas,

 When running locally, you need
 `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to
 wait job finished. However, when you submit to the cluster, you need to
 delete this code. In my opinion, the current feasible solution is that you
 prepare two sets of codes for this, although this is annoying. After all,
 running jobs locally is usually for testing, so it should be acceptable to
 prepare different codes.
 In the long run, it should be the flink framework that makes different
 behaviors according to different environments  so that users don’t need to
 prepare different codes.

 Best,
 Xingbo

 Manas Kale  于2020年9月1日周二 下午3:00写道:

> Hi,
> I am trying to submit a pyFlink job in detached mode using the command:
>
> ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j
> flink-sql-connector-kafka_2.11-1.11.0.jar
>
> The jobs are submitted successfully but the command does not return. I
> realized that was because I had the following line in
> basic_streaming_job.py:
>
> ten_sec_summaries.get_job_client().get_job_execution_result().result()
>
> This statement is useful when testing this locally within a
> minicluster (using python basic_streaming_job.py) but not needed when the
> job is submitted to a cluster.
>
> So I would like to programmatically detect if the
> StreamExecutionEnvironment is a localStreamEnvironment and execute
> the above snippet accordingly. How do I do this?
>
>
> Thanks,
> Manas
>



Re: localtimestamp和current_timestamp插入mysql时区错乱

2020-09-09 Thread Leonard Xu
Hi,

> 这样插入mysql 后dtm时区是乱的, 应该插入的是当前时间减8个小时的,变成了当前时间减21小时

变成当前时间减21小时这个感觉好奇怪,方便贴下完整的代码和数据吗?

Best
Leonard

Re: flink-sql消费基于on kafka的flink table,每次select这个flink table相当于是不同group id了吗?

2020-09-09 Thread Leonard Xu
Hi

可以看下贴下你Kafka table的option 和 作业的 checkpoint配置吗?
可以确定的是,用的都是同一个group id,。
如果你没有配置 checkpoint,  Flink Kafka consumer 的 enable.auto.commit 默认设置为 
false,就不会提交对应group 的offset, 此时你两个作业只是用 group id 确定一个起始消费offset,得到的数据就是一致的。
你可以看看[1][2]里面对这个机制的解释。

Best
Leonard

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#%E9%85%8D%E7%BD%AE-kafka-consumer-%E5%BC%80%E5%A7%8B%E6%B6%88%E8%B4%B9%E7%9A%84%E4%BD%8D%E7%BD%AE
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html#kafka-consumer-%E6%8F%90%E4%BA%A4-offset-%E7%9A%84%E8%A1%8C%E4%B8%BA%E9%85%8D%E7%BD%AE

> 在 2020年9月9日,16:24,凌天荣 <466792...@qq.com> 写道:
> 
> 现有一张表基于kafka的flink 
> table,我同时起两个任务对同一张表(这张表)进行select,得到的是分别每个任务得到一样的数据,也就是说每次select这个表,每次group 
> id都不同吗?



Re: Row和RowData的区别

2020-09-09 Thread Danny Chan
Row 是暴露给 DataStream 用户用的,里面可以设置 RowKind,RowData 是 Table 内部的数据结构,在一些场景序列化会有提升,使用 
Flink SQL 会直接应用上 RowData,当然高级用户想直接用 RowData 也是可以的,1.11 的新版 connector API 就是将 
RowData 暴露给了 connector 开发者。

Best,
Danny Chan



> 在 2020年9月9日,下午1:51,刘首维  写道:
> 
> Hi all,
> 
>
> 请问`org.apache.flink.types.Row`和`org.apache.flink.table.data.RowData`的区别和联系是?



Re: flink-cdc sink mysql 问题

2020-09-09 Thread Leonard Xu
Hi

这个错误是jar包没有正确地加载,看代码应该没啥问题,添加jar包后需要重启下集群,你测试的时候重启了吗?


祝好
Leonard

> 在 2020年9月9日,16:48,杨帅统  写道:
> 
> 公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。
> 在做测试的时候定义一张cdc源表和一张sink表
> CREATE TABLE pvuv_test (
>  id INT,
>  dt STRING,
>  pv STRING,
>  uv STRING ,
>  proc_time AS PROCTIME() --使用维表时需要指定该字段
> ) WITH (
>  'connector' = 'mysql-cdc', -- 连接器
>  'hostname' = 'localhost',   --mysql地址
>  'port' = '3306',  -- mysql端口
>  'username' = 'root',  --mysql用户名
>  'password' = 'rootzs',  -- mysql密码
>  'database-name' = 'etc_demo', --  数据库名称
>  'table-name' = 'puuv_test'
> );
> CREATE TABLE pvuv_test_back (
>  id INT,
>  dt STRING,
>  pv STRING,
>  uv STRING ,
>  proc_time AS PROCTIME() --使用维表时需要指定该字段
> ) WITH (
>  'connector' = 'mysql-cdc', -- 连接器
>  'hostname' = 'localhost',   --mysql地址
>  'port' = '3306',  -- mysql端口
>  'username' = 'root',  --mysql用户名
>  'password' = 'rootzs',  -- mysql密码
>  'database-name' = 'etc_demo', --  数据库名称
>  'table-name' = 'puuv_test_back'
> );
> 但是在通过SQL Client执行下面语句的时候,报错
> INSERT INTO pvuv_test_back
> SELECT * FROM pvuv_test;
> -
> 报错信息如下
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Could not find any factory 
> for identifier 'mysql-cdc' that implements 
> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
> Available factory identifiers are:
> blackhole
> elasticsearch-6
> kafka
> print
> 
> 
> Flink/lib 目录下已经有mysql-cdc的jar包 不知道问题出现在哪里
> 
> 
> 最后对MySQL-MySQL数据实时同步的需求 不知道大家还有什么其他的方案或者想法。感谢



Re: How to get Latency Tracking results?

2020-09-09 Thread Pankaj Chand
Hi David,

Thanks for replying! Sorry, I forgot to mention I am using Flink Version:
1.11.1, Commit ID: 7eb514a.
Is it possible that the default SocketWindowWordCount job is too simple to
generate Latency metrics? Or that the latency metrics disappear from the
output JSON when the data ingestion is zero?

Thanks,

Pankaj


On Wed, Sep 9, 2020 at 6:27 AM David Anderson  wrote:

> Pankaj,
>
> The Flink web UI doesn't do any visualizations of histogram metrics, so
> the only way to access the latency metrics is either through the REST api
> or a metrics reporter.
>
> The REST endpoint you tried is the correct place to find these metrics in
> all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6
> (if I recall correctly) these metrics were task metrics. So if you are
> using an older version of Flink you'll need to dig deeper. I believe you'll
> find them in
>
> /jobs//vertices//subtasks/metrics
>
> Regards,
> David
>
>
>
> On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand 
> wrote:
>
>> Hello,
>>
>> How do I visualize (or extract) the results for Latency Tracking for a
>> Flink local cluster? I set "metrics.latency.interval 100" in the
>> conf/flink-conf.yaml file, and started the cluster and
>> SocketWindowWordCount job. However, I could not find the latency
>> distributions anywhere in the web UI, nor are there any latency metrics in
>> the Metrics dropdown box for either task.
>>
>> I also set "metrics.latency.granularity "operator"" in
>> conf/flink-conf.yaml, but that did not help.
>>
>> When I tried to query the REST endpoint, I got the following output which
>> did not seem to contain anything related to latency:
>>
>> $ curl -s
>> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics
>>
>>
>> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}]
>>
>> Thanks,
>>
>> Pankaj
>>
>


Re: sql-client提交报错UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-09-09 Thread Leonard Xu
Hi 
这个错误一般是你的query 是upsert的query,没能推断出PK,所以报错了 。
如果是自定义的connector, 应该实现 DynamicTableSink 接口而不是 老的 UpsertStreamTableSink接口, 
实现DynamicTableSink接口可以支持在表上定义PK,不用推导。
看这个报错,kudu的connector实现的是 老的UpsertStreamTableSink, 
绕过的办法是改写下你的query,让query可以推导出pk。

祝好
Leonard



> 在 2020年9月9日,20:27,kandy.wang  写道:
> 
> 
> 
> 自实现了kudu connector报错:
> 
> 
> 2020-09-09 18:34:59,442 WARN  org.apache.flink.table.client.cli.CliClient 
>  [] - Could not execute SQL statement.
> 
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
> statement.
> 
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_262]
> 
>at 
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
> Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink 
> requires that Table has a full primary keys if it is updated.
> 
>at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>  ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> 
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
>  

Re: Watermark generation issues with File sources in Flink 1.11.1

2020-09-09 Thread Arti Pande
Hi Aljoscha,

By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
1.11.1 when using File source the source operator (guessing split
enumerator or metadata reader) finishes immediately after starting (and
assigning the splits to split readers) hence when first checkpoint is
triggered, it sees the state of the first operator i.e. source as finished
and hence does not do any checkpointing. Thats' what you can see in logs
and also on the Flink UI for checkpoints. It assumes that the pipeline is
about to finish shortly and aborts the checkpoint.

This along with the watermark generation problems kind of make it difficult
to use file source in production.


On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek 
wrote:

> Hi Arti,
>
> what exactly do you mean by "checkpoints do not work"? Are there
> exceptions being thrown? How are you writing your file-based sources,
> what API methods are you using?
>
> Best,
> Aljoscha
>
> On 20.08.20 16:21, Arti Pande wrote:
> > Hi Till,
> >
> > Thank you for your quick response. Both the
> AssignerWithPeriodicWatermarks
> > and WatermarkStrategy I am using are very simple ones.
> >
> > *Code for AssignerWithPeriodicWatermarks:*
> >
> > public class CustomEventTimeWatermarkGenerator implements
> > AssignerWithPeriodicWatermarks {
> >
> >  private final long maxOutOfOrderness = 0;
> >  private long currentMaxTimestamp;
> >
> >  @Override
> >  public long extractTimestamp(MyPojo myPojo, long previousTimestamp)
> {
> >  long timestamp = myPojo.getInitiationTime().toEpochMilli();
> >  currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
> >  return timestamp;
> >  }
> >
> >  @Override
> >  public Watermark getCurrentWatermark() {
> >  return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
> >  }
> > }
> >
> >
> > *Code for WatermarkStrategy :*
> >
> > WatermarkStrategy watermarkStrategy =
> >
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(0))
> >  .withTimestampAssigner((event, timestamp) ->
> > event.getInitiationTime().toEpochMilli());
> >
> >
> > Thanks & regards,
> > Arti
> >
> >
> > On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann 
> wrote:
> >
> >> Hi Arti,
> >>
> >> thanks for sharing this feedback with us. The WatermarkStrategy has been
> >> introduced quite recently and might have some rough edges. I am pulling
> in
> >> Aljoscha and Klou who have worked on this feature and might be able to
> help
> >> you. For better understanding your problem, it would be great if you
> could
> >> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
> >>
> >> For the file source, the Flink community has recently introduced a new
> >> source abstraction which will also support checkpoints for file sources
> >> once the file source connector has been migrated to the new interfaces.
> The
> >> community is currently working on it.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande 
> wrote:
> >>
> >>> Hi,
> >>>
> >>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
> >>> the watermark generation has issues with file source alone. It works
> well
> >>> with Kafka source.
> >>>
> >>> With 1.9.2 a custom watermark generator implementation of
> >>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
> >>> deprecated and to be replaced with WatermarkStrategy (that combines
> both
> >>> WatermarkGenerator and TimestampAssigner).
> >>>
> >>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
> >>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
> >>> perfectly well but with file source none of them works. The watermark
> >>> assigner never increments the watermarks resulting in stateful
> operators
> >>> not clearing their state ever, leading to erroneous results and
> >>> continuously increasing memory usage.
> >>>
> >>> Same code works well with Kafka source. Is this a known issue? If so,
> any
> >>> fix planned shortly?
> >>>
> >>> A side note (and probably a candidate for separate email, but I will
> >>> write it here) even checkpoints do not work with File Source since
> 1.9.2
> >>> and it is still the problem with 1.11.1. Just wondering if File source
> with
> >>> stream API is not a priority in Flink development? If so we can
> rethink our
> >>> sources.
> >>>
> >>> Thanks & regards,
> >>> Arti
> >>>
> >>
> >
>
>


Re: Flink 1.8.3 GC issues

2020-09-09 Thread Piotr Nowojski
Hi Josson,

Thanks for getting back.

What are the JVM settings and in particular GC settings that you are using
(G1GC?)?
It could also be an issue that in 1.4 you were just slightly below the
threshold of GC issues, while in 1.8, something is using a bit more memory,
causing the GC issues to appear? Have you tried just increasing the heap
size?
Have you tried to compare on the job start up, what is the usage and size
of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can point us in
the right direction.

> My understanding on back pressure is that it is not based on Heap memory
but based on how fast the Network buffers are filled. Is this correct?.
> Does Flink use TCP connection to communicate between tasks if the tasks
are in the same Task manager?.

No, local input channels are being used then, but memory for network
buffers is assigned to tasks regardless of the fraction of local input
channels in the task. However with just single taskmanager and parallelism
of 4, the amount of the memory used by the network stack should be
insignificant, at least as long as you have a reasonably sized job graph
(32KB * (2 * parallelism + 7) * number of tasks).

> What I noticed in Flink 1.4 is that it doesn't read data from Kafka if
there is not sufficient heap memory to process data. Somehow this is not
happening in Flink 1.8 and it fills the heap soon enough not to get
GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8.

No, there were no changes in this part as far as I remember. Tasks when
producing records are serialising them and putting into the network
buffers. If there are no available network buffers, the task is back
pressuring and stops processing new records.

Best regards,
Piotrek


wt., 8 wrz 2020 o 21:51 Josson Paul  napisał(a):

> Hi Piotr,
>2) SystemProcessingTimeService holds the HeapKeyedStateBackend and
> HeapKeyedStateBackend has lot of Objects and that is filling the Heap
>3) I am not using Flink Kafka Connector. But we are using Apache Beam
> kafka connector.  There is a change in the Apache Beam version. But the
> kafka client we are using is the same as the one which was working in the
> other cluster where  Flink was 1.4.
>   *There is no change in Hardware/Java/Kafka/Kafka Client/Application
> between the cluster which is working and not working*
>
> I am aware of the memory changes and network buffer changes between 1.4
> and 1.8.
>
> Flink 1.4 had network buffers on Heap and 1.8 network buffers are on the
> native memory. I modified the Flink 1.8 code to put it back to Heap memory
> but the issue didn't get resolved.
>
> Mine is a streaming job so we set 'taskmanager.memory.fraction' to very
> minimal and that heap is fully available for user data.
>
> Flink 1.4 was not using Credit based Flow control and Flink 1.8 uses
> Credit based Flow control. *Our set up has only 1 task manager and 4
> parallelisms*.  According to this video
> https://www.youtube.com/watch?v=AbqatHF3tZI_channel=FlinkForward (
> *16:21*) if tasks are in same task manager,  Flink doesn't use Credit
> Based Flow control. Essentially no change between Flink 1.4 and 1.8 in *our
> set up*. Still I tried to change the Credit Based Flow Control to False
> and test my setup. The problem persists.
>
> What I noticed in Flink 1.4 is that it doesn't read data from Kafka if
> there is not sufficient heap memory to process data. Somehow this is not
> happening in Flink 1.8 and it fills the heap soon enough not to get
> GCed/Finalized. Any change around this between Flink 1.4 and Flink 1.8.
>
> My understanding on back pressure is that it is not based on Heap memory
> but based on how fast the Network buffers are filled. Is this correct?.
> Does Flink use TCP connection to communicate between tasks if the tasks
> are in the same Task manager?.
>
> Thanks,
> josson
>
> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski 
> wrote:
>
>> Hi Josson,
>>
>> 2. Are you sure that all/vast majority of those objects are pointing
>> towards SystemProcessingTimeService? And is this really the problem of
>> those objects? Are they taking that much of the memory?
>> 3. It still could be Kafka's problem, as it's likely that between 1.4 and
>> 1.8.x we bumped Kafka dependencies.
>>
>> Frankly if that's not some other external dependency issue, I would
>> expect that the problem might lie somewhere completely else. Flink's code
>> relaying on the finalisation hasn't changed since 2015/2016. On the other
>> hand there were quite a bit of changes between 1.4 and 1.8.x, some of them
>> were affecting memory usage. Have you read release notes for versions 1.5,
>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have memory
>> related notes that could be addressed via configuration changes.
>>
>> Thanks,
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-notes/flink-1.8.html

Re: Slow Performance inquiry

2020-09-09 Thread Heidi Hazem Mohamed
Hi Walther,

Many thanks for your answer, I declared the state type as below


ValueStateDescriptor descriptor =
new ValueStateDescriptor(
   "Rating Matrix",
   TypeInformation.of(new TypeHint() {
   }
   ));

Is there a better way?

Regards,

Heidy


From: Timo Walther 
Sent: Wednesday, September 9, 2020 1:58 PM
To: user@flink.apache.org 
Subject: Re: Slow Performance inquiry

Hi Hazem,

I guess your performance is mostly driven by the serialization overhead
in this case. How do you declare your state type?

Flink comes with different serializers. Not all of them are extracted
automatically when using reflective extraction methods:

-  Note that `Serializable` declaration has no effect for Flink, other
than NOT using Flink's efficient serializers.
- Flink's POJO serializer only works with a default constructor present.
- Row needs to explicit declaration of fields.

Regards,
Timo


On 09.09.20 13:08, Heidi Hazem Mohamed wrote:
> Dear,
>
> I am writing a Flink program(Recommender system) needed a matrix as a
> state which is the rating matrix, While the matrix is very sparse, I
> implemented a sparse binary matrix to save the memory and save only the
> ones, not all the matrix and use it as a data type and save it in a
> value State but unexpectedly the performance became terrible and the job
> became very slow, I wonder any suggestion to know what is the problem?
>
> My first implementation for the rating matrix state :
>
> MapState>ratingMatrix;
>
>
> The second implementation (the slow one) for rating matrix state:
>
> ValueStateuserItemRatingHistory;
>
>
> and this apart from sparseBinaryMatrix class
>
> public class SparseBinaryMatriximplements Serializable {
>
>  private ArrayListcontent;
>
> private int rowLength;
>
> private HashMapcolumnLabels;
> private HashMapinverseColumnLabels;
>
> private HashMaprowLabels;
> private HashMapinverseRowLabels;
>
> private enum LabelerType{Row, Column};
>
> public IntegercolNumber;
> public IntegerrowNumber;
>
>
> // This constructor initializes the matrix with zeros
> public SparseBinaryMatrix(int rows, int columns)
>  {
>  content =new ArrayList<>(rows);
> rowLength = columns;
> // for (int i = 0; i < rows; i++)
> // content.add(new Row(columns));
>
>
> }
>
>
>
> Is depending on other class (Row) may lead to this terrible performance
> while Row is class I have implemented and this is part of it
>
> public class Rowimplements Serializable {
>  //This is an alternating sorted array
> private ArrayListcontent;
> private int length=0;
>
> public Row (int numbColumns)
>  {
>  length = numbColumns;
> for (int i =0; i < numbColumns;i++)
>  setColumnToZero(i);
> }
>
>  public Row (int[] initialValues )
>  {
>  length = initialValues.length;
> content =new ArrayList<>(length);
> for (int i =0; i   setColumn(i, initialValues[i]);
> }
>
>
> Regards,
>
> Heidy
>



sql-client提交报错UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-09-09 Thread kandy.wang


自实现了kudu connector报错:


2020-09-09 18:34:59,442 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.

org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
statement.

at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:579)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:515)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:596) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:315) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_262]

at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

Caused by: org.apache.flink.table.api.TableException: UpsertStreamTableSink 
requires that Table has a full primary keys if it is updated.

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
 ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
 ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:327)
 ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:284)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]

at 

Yarn-Session长时间运行Kerberos问题

2020-09-09 Thread hua mulan
Session长时间运行提交Job会出现Kerberos的Keytab认证失败,这种情况我应该如何排查


2020-09-08 14:58:39,030 INFO  org.apache.flink.yarn.YarnResourceManager 
- Received new container: container_1594117729227_83900_01_39 - 
Remaining pending container requests: 1
2020-09-08 14:58:39,030 INFO  org.apache.flink.yarn.YarnResourceManager 
- Removing container request Capability[]Priority[1]. Pending container requests 0.
2020-09-08 14:58:39,030 INFO  org.apache.flink.yarn.YarnResourceManager 
- Adding keytab 
hdfs://sjh-kf-cm01:8020/user/hive/.flink/application_1594117729227_83900/hive_nohost.keytab
 to the AM container klocal resource bucet
2020-09-08 14:58:39,035 WARN  org.apache.hadoop.ipc.Client  
- Exception encountered while connecting to the server : 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
2020-09-08 14:58:39,040 ERROR org.apache.flink.yarn.YarnResourceManager 
- Could not start TaskManager in container 
container_1594117729227_83900_01_39.
java.io.IOException: Failed on local exception: java.io.IOException: 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]; Host Details : local host is: "sjh-kf-data02/172.29.51.73"; destination 
host is: "sjh-kf-cm01":8020;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
at org.apache.hadoop.ipc.Client.call(Client.java:1480)
at org.apache.hadoop.ipc.Client.call(Client.java:1413)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy38.getFileInfo(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:776)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy39.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2117)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:224)
at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:429)
at 
org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:554)
at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:390)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: javax.security.sasl.SaslException: GSS initiate 
failed [Caused by GSSException: No valid credentials provided (Mechanism level: 
Failed to find any Kerberos tgt)]
at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 

Re: Slow Performance inquiry

2020-09-09 Thread Timo Walther

Hi Hazem,

I guess your performance is mostly driven by the serialization overhead 
in this case. How do you declare your state type?


Flink comes with different serializers. Not all of them are extracted 
automatically when using reflective extraction methods:


-  Note that `Serializable` declaration has no effect for Flink, other 
than NOT using Flink's efficient serializers.
- Flink's POJO serializer only works with a default constructor present. 
- Row needs to explicit declaration of fields.


Regards,
Timo


On 09.09.20 13:08, Heidi Hazem Mohamed wrote:

Dear,

I am writing a Flink program(Recommender system) needed a matrix as a 
state which is the rating matrix, While the matrix is very sparse, I 
implemented a sparse binary matrix to save the memory and save only the 
ones, not all the matrix and use it as a data type and save it in a 
value State but unexpectedly the performance became terrible and the job 
became very slow, I wonder any suggestion to know what is the problem?


My first implementation for the rating matrix state :

MapState>ratingMatrix;


The second implementation (the slow one) for rating matrix state:

ValueStateuserItemRatingHistory;


and this apart from sparseBinaryMatrix class

public class SparseBinaryMatriximplements Serializable {

 private ArrayListcontent;

private int rowLength;

private HashMapcolumnLabels;
private HashMapinverseColumnLabels;

private HashMaprowLabels;
private HashMapinverseRowLabels;

private enum LabelerType{Row, Column};

public IntegercolNumber;
public IntegerrowNumber;


// This constructor initializes the matrix with zeros
public SparseBinaryMatrix(int rows, int columns)
 {
 content =new ArrayList<>(rows);
rowLength = columns;
// for (int i = 0; i < rows; i++)
// content.add(new Row(columns));


}



Is depending on other class (Row) may lead to this terrible performance 
while Row is class I have implemented and this is part of it


public class Rowimplements Serializable {
 //This is an alternating sorted array
private ArrayListcontent;
private int length=0;

public Row (int numbColumns)
 {
 length = numbColumns;
for (int i =0; i < numbColumns;i++)
 setColumnToZero(i);
}

 public Row (int[] initialValues )
 {
 length = initialValues.length;
content =new ArrayList<>(length);
for (int i =0; i 



回复:flink1.9.3 on yarn 提交任务问题

2020-09-09 Thread Jun Zhang
这个应该是和你配置的HA有关,你去掉HA配置试试,或者检查一下HA配置



BestJun


-- 原始邮件 --
发件人: 宁吉浩 

Slow Performance inquiry

2020-09-09 Thread Heidi Hazem Mohamed
Dear,

I am writing a Flink program(Recommender system) needed a matrix as a state 
which is the rating matrix, While the matrix is very sparse, I implemented a 
sparse binary matrix to save the memory and save only the ones, not all the 
matrix and use it as a data type and save it in a value State but unexpectedly 
the performance became terrible and the job became very slow, I wonder any 
suggestion to know what is the problem?

My first implementation for the rating matrix state :

MapState> ratingMatrix;

The second implementation (the slow one) for rating matrix state:

ValueState userItemRatingHistory;

and this apart from sparseBinaryMatrix class


public class SparseBinaryMatrix  implements Serializable {

private ArrayList content;

private int rowLength;

private HashMap columnLabels;
private HashMap inverseColumnLabels;

private HashMap rowLabels;
private HashMap inverseRowLabels;

private enum LabelerType{Row, Column};

public Integer colNumber;
public  Integer rowNumber;


// This constructor initializes the matrix with zeros
public SparseBinaryMatrix(int rows, int columns)
{
content = new ArrayList<>(rows);
rowLength = columns;
//for (int i = 0; i < rows; i++)
//content.add(new Row(columns));


}


Is depending on other class (Row) may lead to this terrible performance while 
Row is class I have implemented and this is part of it


public class Row  implements Serializable {
//This is an alternating sorted array
private ArrayList content;
private int length=0;

public Row (int numbColumns)
{
length = numbColumns;
for (int i = 0; i < numbColumns;i++)
setColumnToZero(i);
}

public Row (int[] initialValues )
{
length = initialValues.length;
content = new ArrayList<>(length);
for (int i = 0; i < length;i++)
setColumn(i, initialValues[i]);
}

Regards,

Heidy


Re: Performance issue associated with managed RocksDB memory

2020-09-09 Thread Stephan Ewen
Hey Juha!

I agree that we cannot reasonably expect from the majority of users to
understand block sizes, area sizes, etc to get their application running.
So the default should be "inform when there is a problem and suggest to use
more memory." Block/arena size tuning is for the absolute expertes, the 5%
super power users.

The managed memory is 128 MB by default in the mini cluster. In a
standalone session cluster setup with default config, it is 512 MB.

Best,
Stephan



On Wed, Sep 9, 2020 at 11:10 AM Juha Mynttinen 
wrote:

> Hey Yun,
>
> About the docs. I saw in the docs (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html)
> this:
>
> "An advanced option (expert mode) to reduce the number of MemTable flushes
> in setups with many states, is to tune RocksDB’s ColumnFamily options
> (arena block size, max background flush threads, etc.) via a
> RocksDBOptionsFactory".
>
> Only after debugging this issue we're talking about, I figured that this
> snippet in the docs is probably talking about the issue I'm witnessing. I
> think there are two issues here:
>
> 1) it's hard/impossible to know what kind of performance one can expect
> from a Flink application. Thus, it's hard to know if one is suffering from
> e.g. from this performance issue, or if the system is performing normally
> (and inherently being slow).
> 2) even if one suspects a performance issue, it's very hard to find the
> root cause of the performance issue (memtable flush happening frequently).
> To find out this one would need to know what's the normal flush frequency.
>
> Also the doc says "in setups with many states". The same problem is hit
> when using just one state, but "high" parallelism (5).
>
> If the arena block size _ever_ needs  to be configured only to "fix" this
> issue, it'd be best if there _never_ was a need to modify arena block size. 
> What
> if we forget even mentioning arena block size in the docs and focus on the
> managed memory size, since managed memory size is something the user does
> tune.
>
> You're right that a very clear WARN message could also help to cope with
> the issue. What if there was a WARN message saying that performance will be
> poor and you should increase the managed memory size? And get rid of that
> arena block size decreasing example in the docs.
>
> Also, the default managed memory size is AFAIK 128MB right now. That could
> be increased. That would get rid of this issue in many cases.
>
> Regards,
> Juha
>
> --
> *From:* Yun Tang 
> *Sent:* Tuesday, September 8, 2020 8:05 PM
> *To:* Juha Mynttinen ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hi Juha
>
> I planned to give some descriptions in Flink documentation to give such
> hints, however, it has too many details for RocksDB and we could increase
> the managed memory size to a proper value to avoid this in most cases.
> Since you have come across this and reported in user mailing list, and I
> think it's worth to give some hints in Flink documentations.
>
> When talking about your idea to sanity check the arena size, I think a
> warning should be enough as Flink seems never throw exception directly when
> the performance could be poor.
>
> Best
> Yun Tang
> --
> *From:* Juha Mynttinen 
> *Sent:* Tuesday, September 8, 2020 20:56
> *To:* Yun Tang ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey Yun,
>
> Thanks for the detailed answer. It clarified how things work. Especially
> what is the role of RocksDB arena, and arena block size.
>
> I think there's no real-world case where it would make sense to start to a
> Flink job with RocksDB configured so that RocksDB flushes all the time,
> i.e. where the check "mutable_memtable_memory_usage() > mutable_limit_"
> is always true. The performance is just very poor and by using the same
> amount of RAM but just configuring RocksDB differently, performance could
> be e.g. 100 times better.
>
> It's very easy to hit this issue e.g. by just running a RocksDB-based
> Flink app using RocksDB with either slightly higher parallelism or with
> multiple operators. But finding out what and where the problem is very
> hard, e.g. because the issue is happening in native code and won't be
> visible even using a Java profiler.
>
> I wanted to see if it was possible to check the sanity of the arena block
> size and just make the app crash if the arena block size is too high (or
> the mutable limit too low). I came up with this 
> https://github.com/juha-mynttinen-king/flink/tree/arena_block_sanity_check
> [github.com]
> 

Re: How to get Latency Tracking results?

2020-09-09 Thread David Anderson
Pankaj,

The Flink web UI doesn't do any visualizations of histogram metrics, so the
only way to access the latency metrics is either through the REST api or a
metrics reporter.

The REST endpoint you tried is the correct place to find these metrics in
all recent versions of Flink, but somewhere back before Flink 1.5 or 1.6
(if I recall correctly) these metrics were task metrics. So if you are
using an older version of Flink you'll need to dig deeper. I believe you'll
find them in

/jobs//vertices//subtasks/metrics

Regards,
David



On Tue, Sep 8, 2020 at 10:52 PM Pankaj Chand 
wrote:

> Hello,
>
> How do I visualize (or extract) the results for Latency Tracking for a
> Flink local cluster? I set "metrics.latency.interval 100" in the
> conf/flink-conf.yaml file, and started the cluster and
> SocketWindowWordCount job. However, I could not find the latency
> distributions anywhere in the web UI, nor are there any latency metrics in
> the Metrics dropdown box for either task.
>
> I also set "metrics.latency.granularity "operator"" in
> conf/flink-conf.yaml, but that did not help.
>
> When I tried to query the REST endpoint, I got the following output which
> did not seem to contain anything related to latency:
>
> $ curl -s
> http://localhost:8081/jobs/5d0e348eb68588646dece3654d846cf3/metrics
>
>
> [{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"numRestarts"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"}]
>
> Thanks,
>
> Pankaj
>


回复:flink1.9.3 on yarn 提交任务问题

2020-09-09 Thread 宁吉浩

我使用如下两条命令提交,发现两个任务共使用一个flink-ui
在ui里取消任意一个job,将会使整个集群被取消,这个应该不是正常情况吧?
或者说,是我的提交命令有问题吗? 
ps: -d 参数也增加过,没作用

nohup /data2/workspace/flink/flink-1.9.3/bin/flink run -m yarn-cluster \
-yn 1 -ys 3 -p 3  -ytm 2048m -ynm test \
-c aa.class aaa.jar \ &
nohup /data2/workspace/flink/flink-1.9.3/bin/flink run -m yarn-cluster \
-yn 1 -ys 3 -p 3  -ytm 2048m -ynm test2 \
-c bb.class bbb.jar \ &


--
发件人:zheng faaron 
发送时间:2020年9月8日(星期二) 16:09
收件人:user-zh@flink.apache.org 
主 题:Re: flink1.9.3 on yarn 提交任务问题

Hi,

第一个问题,per-job的方式和session的方式在运行时是不在一个ui中的。

第二个问题,可以配置yarn. container. vcore

Best,
Faaron Zheng


From: 宁吉浩 
Sent: Monday, September 7, 2020 3:23:12 PM
To: user-zh 
Subject: flink1.9.3 on yarn 提交任务问题

我选择用 bin/flink run -m yarn cluster 的方式提交任务;
遇到了两个问题:
1. 这两个任务在一个flink集群ui里可见,甚至和之前的flink-session(测试使用)集群在同一个ui里, 这是正常现象吗?
2. 我知道可以通过并行度和slot的指定来确定 tm的数量,查看下图,两个任务一共占用了yarn的8个容器,请问 cpu这个该如何设定?
ps: 之前使用spark 可以直接设定 执行器核心数量,现在找不到方法,总不能一个tm8个solt,就使用一个cpu吧
[cid:__aliyun159946339265863261]


Re: 问题跟踪

2020-09-09 Thread Dian Fu
因为你需要根据不同'data'的数据,构造不同的作业逻辑,就必然需要执行作业,获取'data'的值,所以你目前的做法是对的。

1.12发布之后,还可以通过以下方式:
1)table.limit(1).to_pandas:可以只取表中的第一条数据
2)table.limit(1).collect():

limit以及collect 1.12会支持。

> 在 2020年9月9日,下午5:28,whh_960101  写道:
> 
> 回答:对的,我的需求是根据'data'的值,再构造不同的作业逻辑,如何实现,求解答!谢谢!



Re: pyflink execute_insert问题求解答

2020-09-09 Thread Dian Fu
针对问题1: 你的需求是这样的吗:先获取表中字段'data'的值(第一行的值),根据'data'的值,再构造不同的作业逻辑?

针对问题2:现在join不支持两个表的字段名重复,可以看一下JIRA [1],所以目前必须保证两个表的字段名不重复。

[1] https://issues.apache.org/jira/browse/FLINK-18679 

> 在 2020年9月9日,下午4:27,whh_960101  写道:
> 
> 问题1:
> 我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
> 例如:
> if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
>   ..
> 
> 
> 
> 
> 问题2:
> full_outer_join(right, join_predicate)[source]¶
> 
> Joins two Table. Similar to a SQL full outer join. The fields of the two 
> joined operations must not overlap, use alias() to rename fields if necessary.
> 
> Note
> 
> 
> 
> Both tables must be bound to the same TableEnvironment and its TableConfig 
> must have null check enabled (default).
> 
> Example:
> 
 left.full_outer_join(right,"a = b").select("a, b, d")
> Parameters
> 
> right (pyflink.table.Table) – Right table.
> 
> join_predicate (str) – The join predicate expression string.
> 
> Returns
> 
> The result table.
> 
> Return type
> 
> pyflink.table.Table
> 
> The fields of the two joined operations must not 
> overlap是什么意思,sql中的full_outer_join例如:
> SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
> FROM Persons
> FULL JOIN Orders
> ON Persons.Id_P=Orders.Id_P
> 
> #on中的两个表的字段是可以重复的,The fields of the two joined operations must not 
> overlap意思是做匹配的两个字段名不能重复吗
> 
> 在 2020-09-09 15:54:35,"nicholasjiang"  写道:
>> 1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>> 针对Multiple Sink的话推荐通过Statement Set方式:
>> statement_set = TableEnvironment.create_statement_set()
>> main_table = source.select("...")
>> sub_table = source.select("...")
>> statement_set.add_insert("main_table", main_table)
>> statement_set.add_insert("sub_table", sub_table)
>> 
>> 2.for i in range(1,20):
>>sub_table = source.select("...%s...%d"
>> %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>> 
>> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>> #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>> 按照上述方式进行Multiple Sink是可以插入多个表。
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink alert after database lookUp

2020-09-09 Thread Timo Walther
Flink's built-in JDBC connector will read the data only once. JDBC does 
not provide means to continously monitor a database table.


It depends on the size of your database, if you parameter table is small 
it might be sufficient to write a simple Flink connector that 
periodically reads the table and ingests the data to the streaming 
pipeline. For larger database/streaming integrations, it might be worth 
to look into Kafka's Connect API. Also Debezium where you hook into 
database logs for retrieving continous data but this might be overkill 
for your usecase.


The link that I've sent you to for streaming pattern slides should work 
after registration.


Regards,
Timo


On 09.09.20 09:49, s_penakalap...@yahoo.com wrote:


Hi Timo,

Thank you for the suggestions.

I see now both Process function and CEP approach will not fit in. Now if 
I follow the third approach to stream the values from database() . Is it 
possible to stream data continuously?


If I follow the bellow approach, both I see one time load only not 
continuously stream
Using JDBCInputFormat this will execute the query only once , so it will 
not be a stream data. when we try to iterate source this may iterate 
only on the data already fetched
Using RichFlatMapFunctions, in open() if I try to connect to DB again 
this would be one time load. If I connect database in flatmap() then it 
would lead to multiple hits to database.


Request your help on how to continuously stream the data, If possible 
sample source code for reference to stream database. Please help me 
badly stuck.


In the mail, I see you asked me to register. Are you referring to any 
training here or any other registration.



Regards,
Sunitha.
On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther 
 wrote:



Hi Sunitha,

what you are describing is a typical streaming enrichment. We need to
enrich the stream with some data from a database. There are different
strategies to handle this:

1) You are querying the database for every record. This is usually not
what you want because it would slow down your pipeline due to the
communication latenties to your database. It would also cause a lot of
pressure to the database in general.

2) You only query database from time to time and store the latest value
in a ProcessFunction ValueState or MapState.

3) You stream in the values as well and use connect() [1].

In any case, I think CEP might not be useful for this case. If you
really want to do option 1, it might make sense to also checkout the SQL
API of Flink because it offers different kind of joins with very good
abstraction. `Join with a Temporal Table` offers a JDBC connector for
lookups in your database.

If you like to use DataStream API, I would also recommend the Pattern
slides here [3] (unfortunately you have to register first).

Regards,
Timo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3] https://training.ververica.com/decks/patterns/


On 07.09.20 17:25, s_penakalap...@yahoo.com 
 wrote:

 > Hi All,
 >
 > I am new to Flink, request your help!!!
 >
 > My scenario :
 > 1> we receive Json messages at a very high frequency like 10,000
 > messages / second
 > 2> we need to raise an Alert for a particular user if there is any
 > breach in threshold value against each attribute in Json.
 > 3> These threshold values are part of my database table and can be
 > frequently updated by different user.
 > 4> In realtime I would like to know how to get latest data from the
 > database.
 >
 > I tried using Flink CEP Pattern approach to generate alert. I would like
 > to get some inputs on how I can implement the realtime lookup tables in
 > Flink Java while monitoring alert, any sample code reference.
 >
 > Also for such scenarios do you recommend to use Flink CEP approach or we
 > need to use Process function approach.
 >
 >
 > Regards,
 > Sunitha.





Re: Performance issue associated with managed RocksDB memory

2020-09-09 Thread Juha Mynttinen
Hey Yun,

About the docs. I saw in the docs 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html)
 this:

"An advanced option (expert mode) to reduce the number of MemTable flushes in 
setups with many states, is to tune RocksDB’s ColumnFamily options (arena block 
size, max background flush threads, etc.) via a RocksDBOptionsFactory".

Only after debugging this issue we're talking about, I figured that this 
snippet in the docs is probably talking about the issue I'm witnessing. I think 
there are two issues here:

1) it's hard/impossible to know what kind of performance one can expect from a 
Flink application. Thus, it's hard to know if one is suffering from e.g. from 
this performance issue, or if the system is performing normally (and inherently 
being slow).
2) even if one suspects a performance issue, it's very hard to find the root 
cause of the performance issue (memtable flush happening frequently). To find 
out this one would need to know what's the normal flush frequency.

Also the doc says "in setups with many states". The same problem is hit when 
using just one state, but "high" parallelism (5).

If the arena block size _ever_ needs  to be configured only to "fix" this 
issue, it'd be best if there _never_ was a need to modify arena block size. 
What if we forget even mentioning arena block size in the docs and focus on the 
managed memory size, since managed memory size is something the user does tune.

You're right that a very clear WARN message could also help to cope with the 
issue. What if there was a WARN message saying that performance will be poor 
and you should increase the managed memory size? And get rid of that arena 
block size decreasing example in the docs.

Also, the default managed memory size is AFAIK 128MB right now. That could be 
increased. That would get rid of this issue in many cases.

Regards,
Juha


From: Yun Tang 
Sent: Tuesday, September 8, 2020 8:05 PM
To: Juha Mynttinen ; user@flink.apache.org 

Subject: Re: Performance issue associated with managed RocksDB memory

Hi Juha

I planned to give some descriptions in Flink documentation to give such hints, 
however, it has too many details for RocksDB and we could increase the managed 
memory size to a proper value to avoid this in most cases.
Since you have come across this and reported in user mailing list, and I think 
it's worth to give some hints in Flink documentations.

When talking about your idea to sanity check the arena size, I think a warning 
should be enough as Flink seems never throw exception directly when the 
performance could be poor.

Best
Yun Tang

From: Juha Mynttinen 
Sent: Tuesday, September 8, 2020 20:56
To: Yun Tang ; user@flink.apache.org 
Subject: Re: Performance issue associated with managed RocksDB memory

Hey Yun,

Thanks for the detailed answer. It clarified how things work. Especially what 
is the role of RocksDB arena, and arena block size.

I think there's no real-world case where it would make sense to start to a 
Flink job with RocksDB configured so that RocksDB flushes all the time, i.e. 
where the check "mutable_memtable_memory_usage() > mutable_limit_" is always 
true. The performance is just very poor and by using the same amount of RAM but 
just configuring RocksDB differently, performance could be e.g. 100 times 
better.

It's very easy to hit this issue e.g. by just running a RocksDB-based Flink app 
using RocksDB with either slightly higher parallelism or with multiple 
operators. But finding out what and where the problem is very hard, e.g. 
because the issue is happening in native code and won't be visible even using a 
Java profiler.

I wanted to see if it was possible to check the sanity of the arena block size 
and just make the app crash if the arena block size is too high (or the mutable 
limit too low). I came up with this 
https://github.com/juha-mynttinen-king/flink/tree/arena_block_sanity_check 
[github.com].
 The code calculates the same parameters that are calculated in RocksDB and 
throws if the arena block size is higher than the "mutable limit".

I did a few quick tests and the code seems to work, with small parallelism my 
app works, but with higher parallelism (when the app would flush all the time), 
it crashes with message like "arenaBlockSize 8388608 < mutableLimit 7340032 
(writeBufferSize 67108864 arenaBlockSizeConfigured 0 defaultArenaBlockSize 
8388608 writeBufferManagerCapacity 8388608). RocksDB would flush memtable 
constantly. Refusing to start. You can 1) make arena block size smaller, 2) 
decrease parallelism (if possible), 3) increase managed memory"


Re: Performance issue associated with managed RocksDB memory

2020-09-09 Thread Stephan Ewen
Thanks for driving, this, it is a great find and a nice proposal for a
solution.

I generally really like the idea of the block size sanity checker.

I would also suggest to first go with logging a big fat WARNING rather than
crashing the job. Crashing the job like this would be an unrecoverable
failure, which we typically try to avoid.
What do you think about that, @Juha? Would you find that prominent enough?

Best,
Stephan


On Tue, Sep 8, 2020 at 7:05 PM Yun Tang  wrote:

> Hi Juha
>
> I planned to give some descriptions in Flink documentation to give such
> hints, however, it has too many details for RocksDB and we could increase
> the managed memory size to a proper value to avoid this in most cases.
> Since you have come across this and reported in user mailing list, and I
> think it's worth to give some hints in Flink documentations.
>
> When talking about your idea to sanity check the arena size, I think a
> warning should be enough as Flink seems never throw exception directly when
> the performance could be poor.
>
> Best
> Yun Tang
> --
> *From:* Juha Mynttinen 
> *Sent:* Tuesday, September 8, 2020 20:56
> *To:* Yun Tang ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey Yun,
>
> Thanks for the detailed answer. It clarified how things work. Especially
> what is the role of RocksDB arena, and arena block size.
>
> I think there's no real-world case where it would make sense to start to a
> Flink job with RocksDB configured so that RocksDB flushes all the time,
> i.e. where the check "mutable_memtable_memory_usage() > mutable_limit_"
> is always true. The performance is just very poor and by using the same
> amount of RAM but just configuring RocksDB differently, performance could
> be e.g. 100 times better.
>
> It's very easy to hit this issue e.g. by just running a RocksDB-based
> Flink app using RocksDB with either slightly higher parallelism or with
> multiple operators. But finding out what and where the problem is very
> hard, e.g. because the issue is happening in native code and won't be
> visible even using a Java profiler.
>
> I wanted to see if it was possible to check the sanity of the arena block
> size and just make the app crash if the arena block size is too high (or
> the mutable limit too low). I came up with this
> https://github.com/juha-mynttinen-king/flink/tree/arena_block_sanity_check.
> The code calculates the same parameters that are calculated in RocksDB and
> throws if the arena block size is higher than the "mutable limit".
>
> I did a few quick tests and the code seems to work, with small parallelism
> my app works, but with higher parallelism (when the app would flush all the
> time), it crashes with message like "arenaBlockSize 8388608 < mutableLimit
> 7340032 (writeBufferSize 67108864 arenaBlockSizeConfigured 0
> defaultArenaBlockSize 8388608 writeBufferManagerCapacity 8388608). RocksDB
> would flush memtable constantly. Refusing to start. You can 1) make arena
> block size smaller, 2) decrease parallelism (if possible), 3) increase
> managed memory"
>
> Regards,
> Juha
>
> --
> *From:* Yun Tang 
> *Sent:* Friday, August 28, 2020 6:58 AM
> *To:* Juha Mynttinen ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hi Juha
>
> Thanks for your enthusiasm to dig this problem and sorry for jumping in
> late for this thread to share something about write buffer manager in
> RocksDB.
>
> First of all, the reason why you meet the poor performance is due to
> writer buffer manager has been assigned a much lower limit (due to poor
> managed memory size on that slot) than actual needed. The competition of
> allocating memory between different column families lead RocksDB to switch
> active memtable to immutable memtable in advance, which leads to the poor
> performance as this increase the write amplification.
>
> To keep the memory not exceed the limit, write buffer manager would decide
> whether to flush the memtable in advance, which is the statement you found: 
> mutable_memtable_memory_usage()
> > mutable_limit_ [1] and the memory usage includes allocated but not even
> used arean_block.
> When talking about the arena, memory allocator in RocksDB, I need to
> correct one thing in your thread: the block cache would not allocate any
> memory, all memory is allocated from arena.
>
> The core idea of RocksDB how to limit memory usage: arena allocates
> memory, write buffer manager decide when to switch memtable to control the
> active memory usage, and write buffer manager also accounts its allocated
> memory into the cache. The underlying block cache evict memory with
> accounting from write buffer manager and the cached block, filter & index.
>
> By default, arena_block_size is not configured, and it would be 1/8 of
> write buffer size [2]. And the default write 

回复: flink-cdc sink mysql 问题

2020-09-09 Thread wdmcode
Hi 杨帅统
MySQL-MySQL数据实时同步也可以使用阿里开源的otter
https://github.com/alibaba/otter/


发件人: 杨帅统
发送时间: Wednesday, September 9, 2020 4:49 PM
收件人: user-zh@flink.apache.org
主题: flink-cdc sink mysql 问题

公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。
在做测试的时候定义一张cdc源表和一张sink表
CREATE TABLE pvuv_test (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test'
);
CREATE TABLE pvuv_test_back (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test_back'
);
但是在通过SQL Client执行下面语句的时候,报错
INSERT INTO pvuv_test_back
SELECT * FROM pvuv_test;
-
报错信息如下
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for 
identifier 'mysql-cdc' that implements 
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
Available factory identifiers are:
blackhole
elasticsearch-6
kafka
print


Flink/lib 目录下已经有mysql-cdc的jar包 不知道问题出现在哪里


最后对MySQL-MySQL数据实时同步的需求 不知道大家还有什么其他的方案或者想法。感谢



flink-cdc sink mysql 问题

2020-09-09 Thread 杨帅统
公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。
在做测试的时候定义一张cdc源表和一张sink表
CREATE TABLE pvuv_test (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test'
);
CREATE TABLE pvuv_test_back (
  id INT,
  dt STRING,
  pv STRING,
  uv STRING ,
  proc_time AS PROCTIME() --使用维表时需要指定该字段
) WITH (
  'connector' = 'mysql-cdc', -- 连接器
  'hostname' = 'localhost',   --mysql地址
  'port' = '3306',  -- mysql端口
  'username' = 'root',  --mysql用户名
  'password' = 'rootzs',  -- mysql密码
  'database-name' = 'etc_demo', --  数据库名称
  'table-name' = 'puuv_test_back'
);
但是在通过SQL Client执行下面语句的时候,报错
INSERT INTO pvuv_test_back
SELECT * FROM pvuv_test;
-
报错信息如下
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for 
identifier 'mysql-cdc' that implements 
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
Available factory identifiers are:
blackhole
elasticsearch-6
kafka
print


Flink/lib 目录下已经有mysql-cdc的jar包 不知道问题出现在哪里


最后对MySQL-MySQL数据实时同步的需求 不知道大家还有什么其他的方案或者想法。感谢

flink table Kafka 重新连接的问题

2020-09-09 Thread marble.zh...@coinflex.com.INVALID
你好。
当connector连接kafka,如果某个message出现exception时,task就停了, 没有自动重新连接, 看了kafka
connector的配置,没有这方面的设置,这个有什么重连机制吗? Thanks.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: pyflink execute_insert问题求解答

2020-09-09 Thread whh_960101
问题1:
我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
例如:
if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
   ..




问题2:
full_outer_join(right, join_predicate)[source]¶

Joins two Table. Similar to a SQL full outer join. The fields of the two joined 
operations must not overlap, use alias() to rename fields if necessary.

Note

 

Both tables must be bound to the same TableEnvironment and its TableConfig must 
have null check enabled (default).

Example:

>>> left.full_outer_join(right,"a = b").select("a, b, d")
Parameters

right (pyflink.table.Table) – Right table.

join_predicate (str) – The join predicate expression string.

Returns

The result table.

Return type

pyflink.table.Table

The fields of the two joined operations must not 
overlap是什么意思,sql中的full_outer_join例如:
SELECT Persons.LastName, Persons.FirstName, Orders.OrderNo
FROM Persons
FULL JOIN Orders
ON Persons.Id_P=Orders.Id_P

 #on中的两个表的字段是可以重复的,The fields of the two joined operations must not 
overlap意思是做匹配的两个字段名不能重复吗

在 2020-09-09 15:54:35,"nicholasjiang"  写道:
>1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
>针对Multiple Sink的话推荐通过Statement Set方式:
>statement_set = TableEnvironment.create_statement_set()
>main_table = source.select("...")
>sub_table = source.select("...")
>statement_set.add_insert("main_table", main_table)
>statement_set.add_insert("sub_table", sub_table)
>
>2.for i in range(1,20):
> sub_table = source.select("...%s...%d"
>%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
>
>sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
>按照上述方式进行Multiple Sink是可以插入多个表。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


flink-sql????????on kafka??flink table??????select????flink table????????????group id??????

2020-09-09 Thread ??????
??kafka??flink 
table??select??selectgroup
 id??

flink sql 1.11.1 如何只插入一个列簇的数据到hbase

2020-09-09 Thread 大罗
Hi,我遇到一个问题,我在hive catalog里定义一个hbase connector表,如下:

CREATE TABLE t_sems_second (
  rowkey string,
  status row (val VARCHAR, dqf VARCHAR),
  analog row (val VARCHAR, dqf VARCHAR)
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 't_sems_second',
 'zookeeper.quorum' =
'dev-hadoop-node-c:2181,dev-hadoop-node-d:2181,dev-hadoop-node-e:2181'
);


然后,我在flink-sql,运行sql插入数据,没有问题,如下:
insert into t_sems_second (rowkey, analog, status) values( 'row2', row('a',
'100'), row('b', '200') );

如果我尝试把第二个列簇的内容置为空字符串,也是可以,如下:
insert into t_sems_second (rowkey, analog, status) values( 'row3', row('c',
'300'), row('', '') );

但是在hbase查询里,就会显示如下的空字符串:
hbase(main):019:0> scan 't_sems_second'
ROW COLUMN+CELL 

   
 row2   column=analog:dqf,
timestamp=1599639282193, value=200  

 row2   column=analog:val,
timestamp=1599639282193, value=b

 row2   column=status:dqf,
timestamp=1599639282193, value=100  

 row2   column=status:val,
timestamp=1599639282193, value=a

* row3   column=analog:dqf,
timestamp=1599639292413, value= 

 row3   column=analog:val,
timestamp=1599639292413, value= *   
 
 row3   column=status:dqf,
timestamp=1599639292413, value=300  

 row3   column=status:val,
timestamp=1599639292413, value=c

2 row(s)
Took 0.1184 seconds 

   
hbase(main):020:0> 

最终,我的问题是如何只插入数据到列簇analog,类似下面的语句:
insert into t_sems_second (rowkey, analog) select 'row1', row('a', '100')  ;
或者:
insert into t_sems_second (rowkey, analog, status) values( 'row2', row('a',
'100'), row(null, NULL) );

但证明是不行的?

那么,flink sql 1.11 hbase
connector支持只插入数据到其中一个列簇吗,还是说实现我的需求,只能定义两个表,每个表只包含一个列簇?







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql client 如何同时执行多条 sql 语句

2020-09-09 Thread nicholasjiang
目前Flink SQL Client不支持同时执行多条SQL语句,可以写个程序读SQL文件调用Table
API的createStatementSet方式同时执行多条SQL语句。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink execute_insert问题求解答

2020-09-09 Thread nicholasjiang
1.最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
针对Multiple Sink的话推荐通过Statement Set方式:
statement_set = TableEnvironment.create_statement_set()
main_table = source.select("...")
sub_table = source.select("...")
statement_set.add_insert("main_table", main_table)
statement_set.add_insert("sub_table", sub_table)

2.for i in range(1,20):
 sub_table = source.select("...%s...%d"
%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table

sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
按照上述方式进行Multiple Sink是可以插入多个表。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink alert after database lookUp

2020-09-09 Thread s_penakalap...@yahoo.com
 
Hi Timo,
Thank you for the suggestions.
I see now both Process function and CEP approach will not fit in. Now if I 
follow the third approach to stream the values from database() . Is it possible 
to stream data continuously?
If I follow the bellow approach, both I see one time load only not continuously 
streamUsing JDBCInputFormat this will execute the query only once , so it will 
not be a stream data. when we try to iterate source this may iterate only on 
the data already fetchedUsing RichFlatMapFunctions, in open() if I try to 
connect to DB again this would be one time load. If I connect database in 
flatmap() then it would lead to multiple hits to database.
Request your help on how to continuously stream the data, If possible sample 
source code for reference to stream database. Please help me badly stuck.
In the mail, I see you asked me to register. Are you referring to any training 
here or any other registration.

Regards,Sunitha.On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo 
Walther  wrote:  
 
 Hi Sunitha,

what you are describing is a typical streaming enrichment. We need to 
enrich the stream with some data from a database. There are different 
strategies to handle this:

1) You are querying the database for every record. This is usually not 
what you want because it would slow down your pipeline due to the 
communication latenties to your database. It would also cause a lot of 
pressure to the database in general.

2) You only query database from time to time and store the latest value 
in a ProcessFunction ValueState or MapState.

3) You stream in the values as well and use connect() [1].

In any case, I think CEP might not be useful for this case. If you 
really want to do option 1, it might make sense to also checkout the SQL 
API of Flink because it offers different kind of joins with very good 
abstraction. `Join with a Temporal Table` offers a JDBC connector for 
lookups in your database.

If you like to use DataStream API, I would also recommend the Pattern 
slides here [3] (unfortunately you have to register first).

Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/etl.html#connected-streams
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html
[3] https://training.ververica.com/decks/patterns/


On 07.09.20 17:25, s_penakalap...@yahoo.com wrote:
> Hi All,
> 
> I am new to Flink, request your help!!!
> 
> My scenario :
> 1> we receive Json messages at a very high frequency like 10,000 
> messages / second
> 2> we need to raise an Alert for a particular user if there is any 
> breach in threshold value against each attribute in Json.
> 3> These threshold values are part of my database table and can be 
> frequently updated by different user.
> 4> In realtime I would like to know how to get latest data from the 
> database.
> 
> I tried using Flink CEP Pattern approach to generate alert. I would like 
> to get some inputs on how I can implement the realtime lookup tables in 
> Flink Java while monitoring alert, any sample code reference.
> 
> Also for such scenarios do you recommend to use Flink CEP approach or we 
> need to use Process function approach.
> 
> 
> Regards,
> Sunitha.

  

flink sql 1.11.1 insert data to hive from kafka split into two jobs

2020-09-09 Thread 大罗
Hi,我遇到一个问题,在代码中利用flink sql 1.11 从kafka插入数据到hive表,过程如下:

首先,从kafka读取json字符串数组数据,比如[{"pid":"a", "val":1, "data_type": 1, "app_type"
:2},
{"pid":"a", "val":1, "data_type": 1, "app_type" :2}]

然后,把这个数据使用flatMap转化为单个对象runDataStream,{"pid":"a", "val":1, "data_type": 1,
"app_type" :2}

把runDataStream输出到redis: runDataStream.addSink(new CustomRedisSink())

然后,再创建临时表,比如:
tableEnv.createTemporaryView("kafkaT1", runDataSingleOutputStreamOperator,
$("pid"),  $("val"), $("app_type"), $("data_type"));

接着定义不同的sql,比如:
String sql1 = "insert into ods_data_10 select pid, val where data_type = 1
and app_type = 0"
String sql2 = "insert into ods_data_11 select pid, val where data_type = 1
and app_type = 1"
String sql3 = "insert into ods_data_01 select pid, val where data_type = 0
and app_type = 1"
String sql4 = "insert into ods_data_00 select pid, val where data_type = 0
and app_type = 0"

使用StatementSet运行它们:
StatementSet ss = tableEnv.createStatementSet();
ss.addInsertSql(sql1);
ss.addInsertSql(sql2);
ss.addInsertSql(sql3);
ss.addInsertSql(sql4);

最后执行作业:
env.execute(jobName);

一切都很正常,没有报错,但是在web UI,却是提交了两个作业,如图:

 

作业"EconStreamingToHiveHbaseRedisJob"对应的应该是写入redis的操作(假设作业ID为jobA),

作业"insert-into_myhive.dw.ods_analog_sems
***"对应的应该是写入4个表的操作(假设作业ID为jobB),如图:

 

其中,顶端的operator的定义如下:
Source: Custom Source -> Map -> Flat Map -> Filter ->
SourceConversion(table=[myhive.dw.kafkaT1], fields=[pid, dqf, val, et,
run_data_type]) -> 
(Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 0) AND NOT((pid LIKE
_UTF-16LE'BP.%')))]) -> StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 1) AND NOT((pid LIKE
_UTF-16LE'BP.%')))]) -> StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 0) AND (pid LIKE _UTF-16LE'BP.%'))]) ->
StreamingFileWriter, 
Calc(select=[pid, val, et, dqf, ((et / 1000) FROM_UNIXTIME
_UTF-16LE'-MM-dd') AS EXPR$4, ((et / 1000) FROM_UNIXTIME _UTF-16LE'HH')
AS EXPR$5], where=[((run_data_type = 1) AND (pid LIKE _UTF-16LE'BP.%'))]) ->
StreamingFileWriter)

我的疑问是,当我想停止这些作业的时候,比如,"./bin/flink stop -m :8081 jobA"
会生成savepoint,比如"Suspending job "395c1f468e65b6e29abb58c27cb80bdc" with a
savepoint."
相应的停止作业jobB的时候也会生成这个savepoint。

我的问题是,停止jobA和jobB之间有没有先后顺序,以及我要使用哪个savepoint保证作业的平滑重启呢?




--
Sent from: http://apache-flink.147419.n8.nabble.com/


localtimestamp??current_timestamp????mysql????????

2020-09-09 Thread xuzh
Dear all??



 CREATE TABLE sink (
 id INT,
 prod_nm STRING,
 dtm timestamp,
 primary key(id) NOT ENFORCED -- '??'
 )
 WITH (
  'connector' = 'jdbc',
  'url' = 
'jdbc:mysql://10.0.0.0:3306/rs_report?useUnicode=truecharacterEncoding=UTF-8',
  'table-name' = 'sink',
  'driver' = 'com.mysql.jdbc.Driver',
  'username' = 'dps',
  'password' = 'dps'
 );

insert into sink
select id,prod_nm,current_timestamp from product;



mysql ??dtm 
??8??21

insert into sink
select id,prod_nm,localtimestamp from product;



mysql ??dtm ??8.

Re: pyflink execute_insert问题求解答

2020-09-09 Thread Dian Fu
这两个看起来是同一个问题,1.11是支持的,可以看一下TableEnvironment.create_statement_set(): 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/table_environment.html#executeexplain-jobs

> 在 2020年9月9日,上午11:31,whh_960101  写道:
> 
> 您好,我使用pyflink时的代码如下,有如下两个问题:
> 1.
> source  = st_env.from_path('source') 
> #st_env是StreamTableEnvironment,source是kafka源端
> main_table = source.select("...")
> sub_table = source.select("...")
> main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
> 
> 
> 最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
> 
> 
> 2.
> for i in range(1,20):
> sub_table = source.select("...%s...%d" 
> %(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
> 
> sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
>  #这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
> 
> 
> 以上两个问题希望您们能够给予解答!感谢!
> 
> 
> 
> 
> 



Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-09 Thread Yang Wang
> The job sub directory will be cleaned up when the job
finished/canceled/failed.
Since we could submit multiple jobs into a Flink session, what i mean is
when a job
reached to the terminal state, the sub node(e.g.
/flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
on the Zookeeper will be cleaned up. But the root
directory(/flink/application_/) still exists.


For your current case, it is a different case(perjob cluster). I think we
need to figure out why the only
running job reached the terminal state. For example, the restart attempts
are exhausted. And you
could find the following logs in your JobManager log.

"org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy"


Best,
Yang




Cristian  于2020年9月9日周三 上午11:26写道:

> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
>
> What does this mean?
>
> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>
> The only cases where I expect Flink to clean up the checkpoint data from
> ZK is when I explicitly stop or cancel the job (in those cases the job
> manager takes a savepoint before cleaning up zk and finishing the cluster).
>
> Which is not the case here. Flink was on autopilot here and decided to
> wipe my poor, good checkpoint metadata as the logs show.
>
> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>
> AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
> will only be cleaned up
> when the Flink cluster reached terminated state.
>
> So if you are using a session cluster, the root cluster node on Zk will be
> cleaned up after you manually
> stop the session cluster. The job sub directory will be cleaned up when
> the job finished/canceled/failed.
>
> If you are using a job/application cluster, once the only running job
> finished/failed, all the HA data will
> be cleaned up. I think you need to check the job restart strategy you have
> set. For example, the following
> configuration will make the Flink cluster terminated after 10 attempts.
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
>
>
> Best,
> Yang
>
> Cristian  于2020年9月9日周三 上午12:28写道:
>
>
> I'm using the standalone script to start the cluster.
>
> As far as I can tell, it's not easy to reproduce. We found that zookeeper
> lost a node around the time this happened, but all of our other 75 Flink
> jobs which use the same setup, version and zookeeper, didn't have any
> issues. They didn't even restart.
>
> So unfortunately I don't know how to reproduce this. All I know is I can't
> sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
>
> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>
> Thanks a lot for reporting this problem here Cristian!
>
> I am not super familiar with the involved components, but the behavior you
> are describing doesn't sound right to me.
> Which entrypoint are you using? This is logged at the beginning, like
> this: "2020-09-08 14:45:32,807 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>
> Do you know by chance if this problem is reproducible? With
> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
> problem.
>
>
>
>
> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <568793...@qq.com> wrote:
>
> Hi Cristian,
>
>
> I don't know if it was designed to be like this deliberately.
>
> So I have already submitted an issue ,and wait for somebody to response.
>
> https://issues.apache.org/jira/browse/FLINK-19154
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询

2020-09-09 Thread Rui Li
Hi,

执行insert的时候需要在代码里等作业结束,可以参考这个util类的写法来做:
https://github.com/apache/flink/blob/release-1.11.1/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableEnvUtil.scala#L26

On Wed, Sep 9, 2020 at 2:01 PM zhongbaoluo  wrote:

> Options options = *null*;
>
> *try* {
>
> OptionParser optionParser = *new* OptionParser(args);
>
> options = optionParser.getOptions();
>
> } *catch* (Exception e) {
>
> e.printStackTrace();
>
> *return*;
>
> }
>
>
>
> String name= options.getName();
>
> String defaultDatabase = options.getDatabase();//"dc_yunpingtai";
>
> String hiveConfDir =  options.getHiveConfDir(); //"/Users/
> zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path
>
> String version = "3.1.2";
>
> String sql = options.getSql();
>
> HiveUtils.*hiveConfDir*(hiveConfDir);
>
>
>
>
>
> HiveConf hiveConf = *new* HiveConf();
>
> hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
> "thrift://dcmaster01:9083");
>
> hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
> "/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster
>
> hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_QUORUM*,
> "dcmaster02:2181");
>
> hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_CLIENT_PORT*, "2181"
> );
>
> hiveConf.setVar(HiveConf.ConfVars.*HIVE_ZOOKEEPER_SESSION_TIMEOUT*,
> "1");
>
> hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2");
>
> hiveConf.set("hive.server2.zookeeper.publish.configs", "true");
>
> hiveConf.set("hive.server2.support.dynamic.service.discovery", "true"
> );
>
> hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");
>
>
>
> *try* {
>
> EnvironmentSettings settings = EnvironmentSettings.*newInstance*
> ().inBatchMode().build();
>
> System.*out*.println("settings  创建完成");
>
> TableEnvironment tableEnv = TableEnvironment.*create*(settings);
>
> System.*out*.println("tableEnv  创建完成");
>
>
>
> MyHiveCatalog hive = *new* MyHiveCatalog(name, defaultDatabase,
> hiveConf,version);
>
> tableEnv.registerCatalog(name, hive);
>
> System.*out*.println("hive  创建完成");
>
>
> // set the HiveCatalog as the current catalog of the session
>
> tableEnv.useCatalog(name);
>
> tableEnv.useDatabase(defaultDatabase);
>
> tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*);
>
> tableEnv.executeSql("show tables").print();
>
> System.*out*.println("sql:"+sql);
>
> //tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>
>
>
> tableEnv.executeSql("DROP TABLE print_table");
>
> tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH
> ('connector' = 'print')");
>
> //tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" +
>
> //"f0 BIGINT\n" +
>
> //") WITH ('connector' = 'jdbc',\n" +
>
> //"'url' = 'jdbc:mysql://192.168.50.120:3306/datacloud
> ?useUnicode=true=utf-8=true=true',\n"
> +
>
> //"'table-name' = 't_dc_test',\n" +
>
> //"'username' = 'dcuser',\n" +
>
> //"'password' = 'datacloud37')");
>
> tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table
> select count(1) from t_mpos_integral_sign_water
>
> //Table result = tableEnv.sqlQuery(sql);//"select count(1) from
> t_mpos_integral_sign_water"
>
> System.*out*.println("tableResult  创建完成");
>
> //result.execute().print();
>
> } *catch* (Exception e) {
>
> e.printStackTrace();
>
> }
>
>
> MyHiveCatalog 类的代码:
>
>
> *public* *class* MyHiveCatalog *extends* HiveCatalog{
>
> *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger*
> (MyHiveCatalog.*class*);
>
> *public* MyHiveCatalog(String catalogName, @Nullable String
> defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) {
>
> *this*(catalogName,defaultDatabase == *null* ? *DEFAULT_DB* :
> defaultDatabase,*createHiveConf*(hiveConf),hiveVersion,*false*);
>
> }
>
> *protected* MyHiveCatalog(String catalogName, String defaultDatabase,
> HiveConf hiveConf, String hiveVersion,
>
> *boolean* allowEmbedded) {
>
> *super*(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded
> );
>
> // *TODO* Auto-generated constructor stub
>
> }
>
> *private* *static* HiveConf createHiveConf(@Nullable HiveConf hiveConf) {
>
> //LOG.info("Setting hive conf dir as {}", hiveConfDir);
>
>
> // try {
>
> // HiveConf.setHiveSiteLocation(
>
> // hiveConfDir == null ?
>
> // null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
>
> // } catch (MalformedURLException e) {
>
> // throw new CatalogException(
>
> // String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
>
> // }
>
>
> // create HiveConf from hadoop configuration
>
> Configuration hadoopConf = HadoopUtils.*getHadoopConfiguration*(*new*
> org.apache.flink.configuration.Configuration());
>
>
> // Add mapred-site.xml. We need to read 

Re: flink sql client 如何同时执行多条 sql 语句

2020-09-09 Thread LittleFall
谢谢两位,我尝试一下。

也希望 flink sql client 本身支持这样的模式。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:手动修改CK状态

2020-09-09 Thread Shuai Xia
好的,已经找到了,谢谢


--
发件人:xuhaiLong 
发送时间:2020年9月9日(星期三) 14:00
收件人:flink 中文社区 ; jkill...@dingtalk.com 

抄 送:flink 中文社区 
主 题:回复:手动修改CK状态

  
可以参考下这个 stateProcess Api
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
  
 
在2020年9月9日 11:41,Shuai Xia 写道:
Hi,各位大佬,如果我想手动读取修改CK存储的状态内容,可以使用什么办法,我记得之前有工具类可以支持



Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询

2020-09-09 Thread zhongbaoluo
Options options = null;
try {
OptionParser optionParser = new OptionParser(args);
options = optionParser.getOptions();
} catch (Exception e) {
e.printStackTrace();
return;
}

String name= options.getName();
String defaultDatabase = options.getDatabase();//"dc_yunpingtai";
String hiveConfDir =  options.getHiveConfDir(); 
//"/Users/zhongbaoluo/Applications/app/apache-hive-3.1.2/conf"; // a local path
String version = "3.1.2";
String sql = options.getSql();
HiveUtils.hiveConfDir(hiveConfDir);


HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://dcmaster01:9083");

hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"/user/hive/warehouse");//hdfs://datacloud-hadoop-cluster
hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "dcmaster02:2181");
hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "2181");
hiveConf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, "1");
hiveConf.set("hive.server2.zookeeper.namespace", "hiveserver2");
hiveConf.set("hive.server2.zookeeper.publish.configs", "true");
hiveConf.set("hive.server2.support.dynamic.service.discovery", "true");
hiveConf.set("hive.metastore.warehouse.dir", "/user/hive/warehouse");

try {
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
System.out.println("settings  创建完成");
TableEnvironment tableEnv = TableEnvironment.create(settings);
System.out.println("tableEnv  创建完成");

MyHiveCatalog hive = new MyHiveCatalog(name, defaultDatabase, 
hiveConf,version);
tableEnv.registerCatalog(name, hive);
System.out.println("hive  创建完成");


// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name);
tableEnv.useDatabase(defaultDatabase);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("show tables").print();
System.out.println("sql:"+sql);
//tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

tableEnv.executeSql("DROP TABLE print_table");
tableEnv.executeSql("CREATE TABLE print_table(f0 BIGINT) WITH 
('connector' = 'print')");
//tableEnv.executeSql("CREATE TABLE print_table_mysql (\n" + 
//"f0 BIGINT\n" + 
//") WITH ('connector' = 'jdbc',\n" + 
//"'url' = 
'jdbc:mysql://192.168.50.120:3306/datacloud?useUnicode=true=utf-8=true=true',\n"
 + 
//"'table-name' = 't_dc_test',\n" + 
//"'username' = 'dcuser',\n" + 
//"'password' = 'datacloud37')");
tableEnv.executeSql(sql);// 这里的 sql = INSERT INTO print_table select 
count(1) from t_mpos_integral_sign_water
//Table result = tableEnv.sqlQuery(sql);//"select count(1) from 
t_mpos_integral_sign_water"
System.out.println("tableResult  创建完成");
//result.execute().print();
} catch (Exception e) {
e.printStackTrace();
}




MyHiveCatalog 类的代码:


public class MyHiveCatalog extends HiveCatalog{
private static final Logger LOG = LoggerFactory.getLogger(MyHiveCatalog.class);
public MyHiveCatalog(String catalogName, @Nullable String defaultDatabase, 
@Nullable HiveConf hiveConf, String hiveVersion) {
this(catalogName,defaultDatabase == null ? DEFAULT_DB : 
defaultDatabase,createHiveConf(hiveConf),hiveVersion,false);
}
protected MyHiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf, String hiveVersion,
boolean allowEmbedded) {
super(catalogName, defaultDatabase, hiveConf, hiveVersion, allowEmbedded);
// TODO Auto-generated constructor stub
}
private static HiveConf createHiveConf(@Nullable HiveConf hiveConf) {
//LOG.info("Setting hive conf dir as {}", hiveConfDir);


// try {
// HiveConf.setHiveSiteLocation(
// hiveConfDir == null ?
// null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
// } catch (MalformedURLException e) {
// throw new CatalogException(
// String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
// }


// create HiveConf from hadoop configuration
Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new 
org.apache.flink.configuration.Configuration());


// Add mapred-site.xml. We need to read configurations like compression codec.
for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new 
org.apache.flink.configuration.Configuration())) {
File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml");
if (mapredSite.exists()) {
hadoopConf.addResource(new Path(mapredSite.getAbsolutePath()));
break;
}
}
HiveConf conf = new HiveConf(hadoopConf, HiveConf.class);
conf.addResource(hiveConf);
return conf;
}


}


**
Thanks & Best Regards!


杉欣集团-技术研究院  云平台
钟保罗


上海浦东新区东方路3261号振华广场B座23楼(杉欣集团)
email: zhongbao...@shxgroup.net
手机: 18157855633
 





 原始邮件 
发件人: taochanglian
收件人: 

回复:手动修改CK状态

2020-09-09 Thread xuhaiLong
可以参考下这个 stateProcess Api
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html


在2020年9月9日 11:41,Shuai Xia 写道:
Hi,各位大佬,如果我想手动读取修改CK存储的状态内容,可以使用什么办法,我记得之前有工具类可以支持