回复: flink1.9写权限认证的es6

2020-07-16 Thread 夏帅
get到了





来自钉钉专属商务邮箱--
发件人:Yangze Guo
日 期:2020年07月17日 13:38:35
收件人:user-zh
主 题:Re: flink1.9写权限认证的es6

Hi,

SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Fri, Jul 17, 2020 at 10:12 AM Dream-底限  wrote:
>
> hi:
> 请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下



Re:Re: flink 1.11任务提交的问题

2020-07-16 Thread sunfulin



hi,
感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
 to DataStream的语句不会生成拓扑。











在 2020-07-17 12:09:20,"godfrey he"  写道:
>hi sunfulin,
>目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
>即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
>虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
>
>Best,
>Godfrey
>
>Leonard Xu  于2020年7月17日周五 上午12:12写道:
>
>> Hi,
>>
>> 我理解目前好像做不到, cc: godfrey 大佬看看
>>
>> 祝好,
>> Leonard Xu
>>
>> > 在 2020年7月16日,23:08,sunfulin  写道:
>> >
>> > hi,
>> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
>> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
>> > 通过StreamExecutionEnvironment.execute提交,yarn
>> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>>
>>


Re: flink1.9写权限认证的es6

2020-07-16 Thread Yangze Guo
Hi,

SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Fri, Jul 17, 2020 at 10:12 AM Dream-底限  wrote:
>
> hi:
> 请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


Re: map JSON to scala case class & off-heap optimization

2020-07-16 Thread Georg Heiler
Many thanks!

Am Mi., 15. Juli 2020 um 15:58 Uhr schrieb Aljoscha Krettek <
aljos...@apache.org>:

> On 11.07.20 10:31, Georg Heiler wrote:
> > 1) similarly to spark the Table API works on some optimized binary
> > representation
> > 2) this is only available in the SQL way of interaction - there is no
> > programmatic API
>
> yes it's available from SQL, but also the Table API, which is a
> programmatic declarative API, similar to Spark's Structured Streaming.
>
>
> > q1) I have read somewhere (I think in some Flink Forward presentations)
> > that the SQL API is not necessarily stable with regards to state - even
> > with small changes to the DAG (due to optimization). So does this also
> > /still apply to the table API? (I assume yes)
>
> Yes, unfortunately this is correct. Because the Table API/SQL is
> declarative users don't have control over the DAG and the state that the
> operators have. Some work will happen on at least making sure that the
> optimizer stays stable between Flink versions or that we can let users
> pin a certain physical graph of a query so that it can be re-used across
> versions.
>
> > q2) When I use the DataSet/Stream (classical scala/java) API it looks
> like
> > I must create a custom serializer if I want to handle one/all of:
> >
> >- side-output failing records and not simply crash the job
> >- as asked before automatic serialization to a scala (case) class
>
> This is true, yes.
>
> > But I also read that creating the ObjectMapper (i.e. in Jackson terms)
> > inside the map function is not recommended. From Spark I know that there
> is
> > a map-partitions function, i.e. something where a database connection can
> > be created and then reused for the individua elements. Is a similar
> > construct available in Flink as well?
>
> Yes, for this you can use "rich functions", which have an open()/close()
> method that allows initializing and re-using resources across
> invocations:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions
>
> > Also, I have read a lot of articles and it looks like a lot of people
> > are using the String serializer and then manually parse the JSON which
> also
> > seems inefficient.
> > Where would I find an example for some Serializer with side outputs for
> > failed records as well as efficient initialization using some similar
> > construct to map-partitions?
>
> I'm not aware of such examples, unfortunately.
>
> I hope that at least some answers will be helpful!
>
> Best,
> Aljoscha
>


flink-1.11 DDL ????hdfs???? Cannot instantiate user function

2020-07-16 Thread kcz
standalone
lib jar??
flink-connector-hive_2.11-1.11.0.jar
flink-json-1.11.0.jar
flink-sql-connector-kafka_2.12-1.11.0.jar 
log4j-api-2.12.1.jar
flink-csv-1.11.0.jar
flink-parquet_2.11-1.11.0.jar   
 flink-table_2.11-1.11.0.jar  
  log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar   
  flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar 
flink-table-blink_2.11-1.11.0.jar 
log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar 
flink-shaded-zookeeper-3.4.14.jar  
log4j-1.2-api-2.12.1.jar





??idea
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));

tableEnv.executeSql("CREATE TABLE source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'test_flink_1.11',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");

tableEnv.executeSql("CREATE TABLE fs_table (\n" +
"  host STRING,\n" +
"  url STRING,\n" +
"  public_date STRING\n" +
") PARTITIONED BY (public_date) WITH (\n" +
"  'connector'='filesystem',\n" +
"  'path'='path',\n" +
"  'format'='json',\n" +
"  'sink.partition-commit.delay'='0s',\n" +
"  'sink.partition-commit.policy.kind'='success-file'\n" +
")");

tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url, 
DATE_FORMAT(public_date, '-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Congxian Qiu
Hi  Peihui

感谢你的回复,我这边没有看到附件,你那边能否确认下呢?

Best,
Congxian


Peihui He  于2020年7月17日周五 上午10:13写道:

> Hi Congxian
>
> 见附件。
>
> Best wishes.
>
> Congxian Qiu  于2020年7月16日周四 下午8:24写道:
>
>> Hi Peihui
>>
>> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
>> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
>> 非常感谢~
>>
>> [1] https://gist.github.com/
>>
>> Best,
>> Congxian
>>
>>
>> Peihui He  于2020年7月16日周四 下午5:54写道:
>>
>> > Hi Yun,
>> >
>> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
>> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
>> >
>> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
>> >
>> > Peihui He  于2020年7月16日周四 下午5:26写道:
>> >
>> >> Hi Yun,
>> >>
>> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>> >>
>> >> Best wishes.
>> >>
>> >> Yun Tang  于2020年7月16日周四 下午5:04写道:
>> >>
>> >>> Hi Peihui
>> >>>
>> >>> Flink-1.10.1
>> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>> >>>
>> >>>
>> >>> [1]
>> >>>
>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>> >>> 祝好
>> >>> 唐云
>> >>> 
>> >>> From: Peihui He 
>> >>> Sent: Thursday, July 16, 2020 16:15
>> >>> To: user-zh@flink.apache.org 
>> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >>>
>> >>> Hi Yun,
>> >>>
>> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>> >>> 输入的特定的word抛出runtimeexception 使task
>> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>> >>>
>> >>> Caused by: java.nio.file.NoSuchFileException:
>> >>>
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> >>> ->
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>> >>>
>> >>> 情况和@chenxyz 类似。
>> >>>
>> >>>
>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>> >>>
>> >>> 换成1.10.1 就可以了
>> >>>
>> >>> Best wishes.
>> >>>
>> >>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>> >>>
>> >>> > Hi Robin
>> >>> >
>> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>> >>> >
>> >>>
>> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>> >>> >
>> >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
>> >>> cause,还请在日志中找一下无法恢复的root
>> >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>> >>> >
>> >>> >
>> >>> > [1]
>> >>> >
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>> >>> > [2]
>> >>> >
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> >>> >
>> >>> > 祝好
>> >>> > 唐云
>> >>> >
>> >>> >
>> >>> > 
>> >>> > From: Robin Zhang 
>> >>> > Sent: Wednesday, July 15, 2020 16:23
>> >>> > To: user-zh@flink.apache.org 
>> >>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >>> >
>> >>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>> >>> >
>> >>> > Best
>> >>> > Robin Zhang
>> >>> > 
>> >>> > From: Peihui He <[hidden email]>
>> >>> > Sent: Tuesday, July 14, 2020 10:42
>> >>> > To: [hidden email] <[hidden email]>
>> >>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >>> >
>> >>> > hello,
>> >>> >
>> >>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>> >>> >
>> >>> >
>> >>> > Caused by: java.nio.file.NoSuchFileException:
>> >>> >
>> >>> >
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> >>> > ->
>> >>> >
>> >>> >
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>> >>> >
>> >>> > 配置和1.9.2 一样:
>> >>> > state.backend: rocksdb
>> >>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>> >>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
>> >>> > state.backend.incremental: true
>> >>> >
>> >>> > 代码上都有
>> >>> >
>> >>> > env.enableCheckpointing(1);
>> >>> >
>> >>> >
>> >>>
>> 

flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka

2020-07-16 Thread wangl...@geekplus.com.cn

 INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) from 
kafka_ods_artemis_out_order group by warehouse_id;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 
'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming update 
changes which is produced by node GroupAggregate(groupBy=[warehouse_id], 
select=[warehouse_id, COUNT(*) AS EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
 
我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 GroupBy 
的结果也发送到 Kafka 呢?

谢谢,
王磊 


wangl...@geekplus.com.cn 



Re: Pyflink sink rowtime field

2020-07-16 Thread Xingbo Huang
Hi Jesse,
I think that the type of rowtime you declared on the source schema is
DataTypes.Timestamp(), you also use DataTypes.Timestamp() on the sink schema

Best,
Xingbo

Jesse Lord  于2020年7月15日周三 下午11:41写道:

> I am trying to sink the rowtime field in pyflink 1.10. I get the following
> error
>
>
>
> For the source schema I use
>
>
>
> .field("rowtime", DataTypes.TIMESTAMP(2))
>
> .rowtime(
>
> Rowtime()
>
> .timestamps_from_field("timestamp")
>
> .watermarks_periodic_ascending()
>
> )
>
>
>
> To create the rowtime field and have tried variations on
>
>
>
> .field("rowtime", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
>
>
>
> In the sink schema.
>
>
>
> Trying all of the different types in DataTypes I get essentially the
> following error:
>
>
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o56.insertInto.
>
> : org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink
> `default_catalog`.`default_database`.`output` do not match.
>
> Query result schema: [rowtime: LocalDateTime]
>
> TableSink schema:[rowtime: Timestamp]
>
>
>
>
>
> I know that in Java there is
> org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME and the python
> documentation lists Types.SQL_TIMESTAMP, but I cannot find the
> corresponding type in the python library. Can anyone help point me to the
> correct type for the schema?
>
>
>
> Thanks,
>
> Jesse
>
>
>
>
>
>
>
>
>
>
>


Re: flink 1.11任务提交的问题

2020-07-16 Thread godfrey he
hi sunfulin,
目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。

Best,
Godfrey

Leonard Xu  于2020年7月17日周五 上午12:12写道:

> Hi,
>
> 我理解目前好像做不到, cc: godfrey 大佬看看
>
> 祝好,
> Leonard Xu
>
> > 在 2020年7月16日,23:08,sunfulin  写道:
> >
> > hi,
> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> > 通过StreamExecutionEnvironment.execute提交,yarn
> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>
>


Re: Performance test Flink vs Storm

2020-07-16 Thread Xintong Song
>
> From this exercise , I understand that increasing JVM memory would
> directly support/increase throughout. Am i correct?
>
It depends. Smaller heap space means more frequent GCs, which occupies
the cpu processing time and also introduces more pauses to your program. If
you already have large enough heap space, then you can hardly benefit from
further increasing it.

I'm not aware of any benchmark for Kafka connectors. You can check
flink-benchmarks[1], and maybe fork the repository and develop your own
Kafka connector benchmark based on it.

Thank you~

Xintong Song


[1] https://github.com/apache/flink-benchmarks

On Fri, Jul 17, 2020 at 10:54 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi,
>
> After making the task.managed. Memory. fraction as 0 , i see that JVM heap
> memory increased from 512 mb to 1 GB.
>
> Earlier I was getting a maximum of 4-6k per second throughput on Kafka
> source for ingestion rate of 12k+/second. Now I see that improved to 11k
> per task(parallelism of 1) and 16.5k+ second when run with parallelism of
> 2. (8.25k per task)..
>
> The maximum memory used during the run was 500 mb of heap space.
>
> From this exercise , I understand that increasing JVM memory would
> directly support/increase throughout. Am i correct?
>
> Our goal is to test for 100k ingestion per second and try to calculate
> cost for 1 million per second ( hope it's linear relation)
>
> I also saw the CPU utilisation peaked to 50% during the same.
>
> 1) Let me know what you think of the same, as I would continue to test.
>
> 2) Is there a benchmark for number of records handled per Kafka connector
> task for a particular JVM heap number.
>
> Thanks,
> Prasanna
>
> On Fri 17 Jul, 2020, 06:18 Xintong Song,  wrote:
>
>> *I had set Checkpoint to use the Job manager backend.*
>>
>> Jobmanager backend also runs in JVM heap space and does not use managed
>> memory. Setting managed memory fraction to 0 will give you larger JVM heap
>> space, thus lesser GC pressure.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Jul 16, 2020 at 10:38 PM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>>
>>> Xintong Song,
>>>
>>>
>>>- Which version of Flink is used?*1.10*
>>>- Which deployment mode is used? *Standalone*
>>>- Which cluster mode is used? *Job*
>>>- Do you mean you have a 4core16gb node for each task manager, and
>>>each task manager has 4 slots? *Yeah*. *There are totally 3
>>>taskmanagers in the cluster.  2TMs are t2.medium machine 2 core 4 gb per
>>>machine. 1 slot per core. 1TM is t2.large 4core 16gb . 4slots in the
>>>machine. There were other jobs running in the t2.medium TMs. T2.large
>>>machine is where the performance testing job was running. *
>>>- Sounds like you are running a streaming job without using any
>>>state. Have you tuned the managed memory fraction
>>>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>>>document[1]?  *No i have not set the
>>>taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
>>>Job manager backend. *
>>>- *The CPU maximum spike i spotted was 40%. *
>>>
>>> *Between i did some latest test only on t2.medium machine with 2 slots
>>> per core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
>>> *I added rebalance to the inputstream.   ex: *
>>> inputStream.rebalance().map()
>>> *I was able to get latency in the range 130ms - 2sec.*
>>>
>>> Let me also know if there are more things to consider here.
>>>
>>> Thanks
>>> Prasanna.
>>>
>>> On Thu, Jul 16, 2020 at 4:04 PM Xintong Song 
>>> wrote:
>>>
 Hi Prasanna,

 Trying to understand how Flink is deployed.

- Which version of Flink is used?
- Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
- Which cluster mode is used? (Job/Session)
- Do you mean you have a 4core16gb node for each task manager, and
each task manager has 4 slots?
- Sounds like you are running a streaming job without using any
state. Have you tuned the managed memory fraction
(`taskmanager.memory.managed.fraction`) to zero as suggested in the
document[1]?

 When running a stateless job or using a heap state backend
> (MemoryStateBackend or FsStateBackend), set managed memory to zero.
>

 I can see a few potential problems.

- Managed memory is probably not configured. That means a
significant fraction of memory is unused.
- It sounds like the CPU processing time is not the bottleneck.
Thus increasing the parallelism will not give you better performance, 
 but
will on the other hand increase the overhead load on the task manager.

 Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
 you have observed lack of performance in reading from Kafka compared to
 Storm.

 Thank you~

 

回复:flink connector formats问题

2020-07-16 Thread 酷酷的浑蛋
找到了,谢谢


| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制


在2020年07月17日 10:57,酷酷的浑蛋 写道:


我看您写了'format.type' = ‘custom',这个custom 是跟哪里关联的呢? 还是说这里要写类路径?


在2020年07月17日 10:47,夏帅 写道:
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/


--
发件人:酷酷的浑蛋 
发送时间:2020年7月17日(星期五) 10:42
收件人:user-zh 
主 题:flink connector formats问题

请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

回复:flink connector formats问题

2020-07-16 Thread 酷酷的浑蛋


我看您写了'format.type' = ‘custom',这个custom 是跟哪里关联的呢? 还是说这里要写类路径?


在2020年07月17日 10:47,夏帅 写道:
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/


--
发件人:酷酷的浑蛋 
发送时间:2020年7月17日(星期五) 10:42
收件人:user-zh 
主 题:flink connector formats问题

请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

Re: Flink yarn session exception

2020-07-16 Thread Rainie Li
好搭,谢谢!

On Thu, Jul 16, 2020 at 5:32 PM 忝忝向仧 <153488...@qq.com> wrote:

> 你可以看看lib里面的包跟官网的要求是不是一样的
>
>
>
> 发自我的iPhone
>
>
> -- Original --
> From: Rainie Li  Date: Fri,Jul 17,2020 1:06 AM
> To: user-zh  Subject: Re: Flink yarn session exception
>
>
>
> 多谢,我set了这些envs:
>
> export JAVA_HOME=/usr/lib/jvm/java-8-oracle
> export PATH=$JAVA_HOME/bin:$PATH
> export HADOOP_HOME=/usr/local/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
> export HADOOP_CLASSPATH=`hadoop classpath`
> export FLINK_CONF_DIR=/etc/flink-1.9.1/conf
> export FLINK_LOG_DIR=/home/rainieli/
>
> 有什么问题吗?
>
>
> On Thu, Jul 16, 2020 at 1:12 AM Paul Lam 
>  日志里说得比较清楚了,classpath 里没有 Hadoop 的 lib。可以参考这个文档 [1] 来配置你的环境。
> 
>  1.
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
>  <
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
>  
>  Best,
>  Paul Lam
> 
>   2020年7月16日 15:46,Rainie Li  写道:
>  
>   大佬们好,我是flink新手,正在用flink 1.9.1
>   Flink APP cannot run, APP log error, 想求教一下会是什么原因造成的,多谢
>  
>   2020-06-16 17:06:21,921 WARN
> org.apache.flink.client.cli.CliFrontend
> 
> 
> - Could not load CLI class
>   org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>   java.lang.NoClassDefFoundError:
>   org/apache/hadoop/yarn/exceptions/YarnException
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:264)
>   at
>  
> 
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
>   at
>  
> 
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
>   at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
>   Caused by: java.lang.ClassNotFoundException:
>   org.apache.hadoop.yarn.exceptions.YarnException
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 5 more
>   2020-06-16 17:06:21,980 INFO
> org.apache.flink.core.fs.FileSystem
> 
> 
> - Hadoop is not in the classpath/dependencies. The
>  extended
>   set of supported File Systems via Hadoop is not available.
> 
> 


Re: Performance test Flink vs Storm

2020-07-16 Thread Prasanna kumar
Hi,

After making the task.managed. Memory. fraction as 0 , i see that JVM heap
memory increased from 512 mb to 1 GB.

Earlier I was getting a maximum of 4-6k per second throughput on Kafka
source for ingestion rate of 12k+/second. Now I see that improved to 11k
per task(parallelism of 1) and 16.5k+ second when run with parallelism of
2. (8.25k per task)..

The maximum memory used during the run was 500 mb of heap space.

>From this exercise , I understand that increasing JVM memory would directly
support/increase throughout. Am i correct?

Our goal is to test for 100k ingestion per second and try to calculate cost
for 1 million per second ( hope it's linear relation)

I also saw the CPU utilisation peaked to 50% during the same.

1) Let me know what you think of the same, as I would continue to test.

2) Is there a benchmark for number of records handled per Kafka connector
task for a particular JVM heap number.

Thanks,
Prasanna

On Fri 17 Jul, 2020, 06:18 Xintong Song,  wrote:

> *I had set Checkpoint to use the Job manager backend.*
>
> Jobmanager backend also runs in JVM heap space and does not use managed
> memory. Setting managed memory fraction to 0 will give you larger JVM heap
> space, thus lesser GC pressure.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jul 16, 2020 at 10:38 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>>
>> Xintong Song,
>>
>>
>>- Which version of Flink is used?*1.10*
>>- Which deployment mode is used? *Standalone*
>>- Which cluster mode is used? *Job*
>>- Do you mean you have a 4core16gb node for each task manager, and
>>each task manager has 4 slots? *Yeah*. *There are totally 3
>>taskmanagers in the cluster.  2TMs are t2.medium machine 2 core 4 gb per
>>machine. 1 slot per core. 1TM is t2.large 4core 16gb . 4slots in the
>>machine. There were other jobs running in the t2.medium TMs. T2.large
>>machine is where the performance testing job was running. *
>>- Sounds like you are running a streaming job without using any
>>state. Have you tuned the managed memory fraction
>>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>>document[1]?  *No i have not set the
>>taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
>>Job manager backend. *
>>- *The CPU maximum spike i spotted was 40%. *
>>
>> *Between i did some latest test only on t2.medium machine with 2 slots
>> per core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
>> *I added rebalance to the inputstream.   ex: *
>> inputStream.rebalance().map()
>> *I was able to get latency in the range 130ms - 2sec.*
>>
>> Let me also know if there are more things to consider here.
>>
>> Thanks
>> Prasanna.
>>
>> On Thu, Jul 16, 2020 at 4:04 PM Xintong Song 
>> wrote:
>>
>>> Hi Prasanna,
>>>
>>> Trying to understand how Flink is deployed.
>>>
>>>- Which version of Flink is used?
>>>- Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
>>>- Which cluster mode is used? (Job/Session)
>>>- Do you mean you have a 4core16gb node for each task manager, and
>>>each task manager has 4 slots?
>>>- Sounds like you are running a streaming job without using any
>>>state. Have you tuned the managed memory fraction
>>>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>>>document[1]?
>>>
>>> When running a stateless job or using a heap state backend
 (MemoryStateBackend or FsStateBackend), set managed memory to zero.

>>>
>>> I can see a few potential problems.
>>>
>>>- Managed memory is probably not configured. That means a
>>>significant fraction of memory is unused.
>>>- It sounds like the CPU processing time is not the bottleneck. Thus
>>>increasing the parallelism will not give you better performance, but will
>>>on the other hand increase the overhead load on the task manager.
>>>
>>> Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
>>> you have observed lack of performance in reading from Kafka compared to
>>> Storm.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
>>>
>>> On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar <
>>> prasannakumarram...@gmail.com> wrote:
>>>
 Hi

 Sending to you all separately as you answered one of my earlier query.

 Thanks,
 Prasanna.


 -- Forwarded message -
 From: Prasanna kumar 
 Date: Wed 15 Jul, 2020, 23:27
 Subject: Performance test Flink vs Storm
 To: , user 


 Hi,

 We are testing flink and storm for our streaming pipelines on various
 features.

 In terms of Latency,i see the flink comes up short on storm even if
 more CPU is given to it. Will Explain in detail.

 *Machine*. t2.large 4 core 16 gb. is used for Used for 

回复:flink1.9写权限认证的es6

2020-07-16 Thread 夏帅
你好,请问是FlinkSQL么
FLinkSQL可以参考下这份邮件
http://apache-flink.147419.n8.nabble.com/ddl-es-td2094.html
DataStream可以尝试自定义ElasticsearchSink实现权限认证


--
发件人:Dream-底限 
发送时间:2020年7月17日(星期五) 10:12
收件人:user-zh 
主 题:flink1.9写权限认证的es6

hi:
请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


回复:flink connector formats问题

2020-07-16 Thread 夏帅
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/


--
发件人:酷酷的浑蛋 
发送时间:2020年7月17日(星期五) 10:42
收件人:user-zh 
主 题:flink connector formats问题

请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

flink connector formats问题

2020-07-16 Thread 酷酷的浑蛋
请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

Re: flink 1.11 checkpoint使用

2020-07-16 Thread Jark Wu
Hi,

能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗?
如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。
当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。

Best,
Jark

On Thu, 16 Jul 2020 at 21:56, godfrey he  wrote:

> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
>
> 曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:
>
> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> > 从checkpoint恢复以后,新来op=d的数据会删除失败
> > 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
> >
> >
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> > 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build();
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> > 最大允许同时出现几个CheckPoint
> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> > 最小得间隔时间
> > env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
> //
> > 是否倾向于用CheckPoint做故障恢复
> > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> > //
> > 容忍多少次CheckPoint失败
> > //Checkpoint文件清理策略
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > //Checkpoint外部文件路径
> > env.setStateBackend(new FsStateBackend(new
> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> > settings);
> > String sourceDDL = String.format(
> > "CREATE TABLE debezium_source (" +
> > " id INT NOT NULL," +
> > " name STRING," +
> > " description STRING," +
> > " weight Double" +
> > ") WITH (" +
> > " 'connector' = 'kafka-0.11'," +
> > " 'topic' = '%s'," +
> > " 'properties.bootstrap.servers' = '%s'," +
> > " 'scan.startup.mode' = 'group-offsets'," +
> > " 'format' = 'debezium-json'" +
> > ")", "ddd", " 172.22.20.206:9092");
> > String sinkDDL = "CREATE TABLE sink (" +
> > " id INT NOT NULL," +
> > " name STRING," +
> > " description STRING," +
> > " weight Double," +
> > " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
> "
> > +
> > ") WITH (" +
> > " 'connector' = 'jdbc'," +
> > " 'url' =
> > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> > " 'table-name' = 'products'," +
> > " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> > " 'username'='DataPip'," +
> > " 'password'='DataPip'" +
> > ")";
> > String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight
> > FROM debezium_source GROUP BY id,name ,description, weight";
> > tEnv.executeSql(sourceDDL);
> > tEnv.executeSql(sinkDDL);
> > tEnv.executeSql(dml);
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Congxian

见附件。

Best wishes.

Congxian Qiu  于2020年7月16日周四 下午8:24写道:

> Hi Peihui
>
> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
> 非常感谢~
>
> [1] https://gist.github.com/
>
> Best,
> Congxian
>
>
> Peihui He  于2020年7月16日周四 下午5:54写道:
>
> > Hi Yun,
> >
> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
> >
> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
> >
> > Peihui He  于2020年7月16日周四 下午5:26写道:
> >
> >> Hi Yun,
> >>
> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
> >>
> >> Best wishes.
> >>
> >> Yun Tang  于2020年7月16日周四 下午5:04写道:
> >>
> >>> Hi Peihui
> >>>
> >>> Flink-1.10.1
> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
> >>>
> >>>
> >>> [1]
> >>>
> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
> >>> 祝好
> >>> 唐云
> >>> 
> >>> From: Peihui He 
> >>> Sent: Thursday, July 16, 2020 16:15
> >>> To: user-zh@flink.apache.org 
> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>>
> >>> Hi Yun,
> >>>
> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
> >>> 输入的特定的word抛出runtimeexception 使task
> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
> >>>
> >>> Caused by: java.nio.file.NoSuchFileException:
> >>>
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> >>> ->
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >>>
> >>> 情况和@chenxyz 类似。
> >>>
> >>>
> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
> >>>
> >>> 换成1.10.1 就可以了
> >>>
> >>> Best wishes.
> >>>
> >>> Yun Tang  于2020年7月15日周三 下午4:35写道:
> >>>
> >>> > Hi Robin
> >>> >
> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> >>> >
> >>>
> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
> >>> >
> >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
> >>> cause,还请在日志中找一下无法恢复的root
> >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
> >>> >
> >>> >
> >>> > [1]
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> >>> > [2]
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> >>> >
> >>> > 祝好
> >>> > 唐云
> >>> >
> >>> >
> >>> > 
> >>> > From: Robin Zhang 
> >>> > Sent: Wednesday, July 15, 2020 16:23
> >>> > To: user-zh@flink.apache.org 
> >>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>> >
> >>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
> >>> >
> >>> > Best
> >>> > Robin Zhang
> >>> > 
> >>> > From: Peihui He <[hidden email]>
> >>> > Sent: Tuesday, July 14, 2020 10:42
> >>> > To: [hidden email] <[hidden email]>
> >>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>> >
> >>> > hello,
> >>> >
> >>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >>> >
> >>> >
> >>> > Caused by: java.nio.file.NoSuchFileException:
> >>> >
> >>> >
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> >>> > ->
> >>> >
> >>> >
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >>> >
> >>> > 配置和1.9.2 一样:
> >>> > state.backend: rocksdb
> >>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> >>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> >>> > state.backend.incremental: true
> >>> >
> >>> > 代码上都有
> >>> >
> >>> > env.enableCheckpointing(1);
> >>> >
> >>> >
> >>>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> >>> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >>> >
> >>> >
> >>> >   是1.10.0 需要做什么特别配置么?
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > 

flink1.9写权限认证的es6

2020-07-16 Thread Dream-底限
hi:
请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


Re: Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 Thread LakeShen
嗯嗯,Congxian,感谢你的回复,我通过 Maven Shaded 解决问题。

Congxian Qiu  于2020年7月16日周四 下午8:19写道:

> Hi
>
> 你的图挂了,如果单纯想解决 jar 包冲突的问题,那么 maven shade plugin[1] 或许对你有用
>
> [1]
>
> https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
> Best,
> Congxian
>
>
> LakeShen  于2020年7月16日周四 下午6:03写道:
>
> > Hi 社区,
> >
> > 我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.
> >
> > 现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:
> > 
> > org.apache.calcite.avatica
> > avatica-core
> > ${avatica.version}
> > 
> >
> > 但是这个依赖其实在 flink-table 模块中,也有这个依赖:
> > [image: image.png]
> >
> > 由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
> > 包中,我在任务启动的时候,就会报:
> > Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> > org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。
> >
> > 按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
> > 目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。
> >
> > 请问怎么解决这个问题呢,非常期待你的回复。
> >
> > Best,
> > LakeShen
> >
> >
> >
>


Re: Performance test Flink vs Storm

2020-07-16 Thread Xintong Song
>
> *I had set Checkpoint to use the Job manager backend.*

Jobmanager backend also runs in JVM heap space and does not use managed
memory. Setting managed memory fraction to 0 will give you larger JVM heap
space, thus lesser GC pressure.

Thank you~

Xintong Song



On Thu, Jul 16, 2020 at 10:38 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

>
> Xintong Song,
>
>
>- Which version of Flink is used?*1.10*
>- Which deployment mode is used? *Standalone*
>- Which cluster mode is used? *Job*
>- Do you mean you have a 4core16gb node for each task manager, and
>each task manager has 4 slots? *Yeah*. *There are totally 3
>taskmanagers in the cluster.  2TMs are t2.medium machine 2 core 4 gb per
>machine. 1 slot per core. 1TM is t2.large 4core 16gb . 4slots in the
>machine. There were other jobs running in the t2.medium TMs. T2.large
>machine is where the performance testing job was running. *
>- Sounds like you are running a streaming job without using any state.
>Have you tuned the managed memory fraction
>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>document[1]?  *No i have not set the
>taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
>Job manager backend. *
>- *The CPU maximum spike i spotted was 40%. *
>
> *Between i did some latest test only on t2.medium machine with 2 slots per
> core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
> *I added rebalance to the inputstream.   ex: *
> inputStream.rebalance().map()
> *I was able to get latency in the range 130ms - 2sec.*
>
> Let me also know if there are more things to consider here.
>
> Thanks
> Prasanna.
>
> On Thu, Jul 16, 2020 at 4:04 PM Xintong Song 
> wrote:
>
>> Hi Prasanna,
>>
>> Trying to understand how Flink is deployed.
>>
>>- Which version of Flink is used?
>>- Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
>>- Which cluster mode is used? (Job/Session)
>>- Do you mean you have a 4core16gb node for each task manager, and
>>each task manager has 4 slots?
>>- Sounds like you are running a streaming job without using any
>>state. Have you tuned the managed memory fraction
>>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>>document[1]?
>>
>> When running a stateless job or using a heap state backend
>>> (MemoryStateBackend or FsStateBackend), set managed memory to zero.
>>>
>>
>> I can see a few potential problems.
>>
>>- Managed memory is probably not configured. That means a significant
>>fraction of memory is unused.
>>- It sounds like the CPU processing time is not the bottleneck. Thus
>>increasing the parallelism will not give you better performance, but will
>>on the other hand increase the overhead load on the task manager.
>>
>> Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
>> you have observed lack of performance in reading from Kafka compared to
>> Storm.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
>>
>> On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> Sending to you all separately as you answered one of my earlier query.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>>
>>> -- Forwarded message -
>>> From: Prasanna kumar 
>>> Date: Wed 15 Jul, 2020, 23:27
>>> Subject: Performance test Flink vs Storm
>>> To: , user 
>>>
>>>
>>> Hi,
>>>
>>> We are testing flink and storm for our streaming pipelines on various
>>> features.
>>>
>>> In terms of Latency,i see the flink comes up short on storm even if more
>>> CPU is given to it. Will Explain in detail.
>>>
>>> *Machine*. t2.large 4 core 16 gb. is used for Used for flink task
>>> manager and storm supervisor node.
>>> *Kafka Partitions* 4
>>> *Messages tested:* 1million
>>> *Load* : 50k/sec
>>>
>>> *Scenario*:
>>> Read from Kafka -> Transform (Map to a different JSON format) - > Write
>>> to a Kafka topic.
>>>
>>> *Test 1*
>>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>>> from Kafka) and 3 bolts (Transformation and sink) .
>>> Flink. Operator level parallelism not set. Task Parallelism is set as 1.
>>> Task slot is 1 per core.
>>>
>>> Storm was 130 milliseconds faster in 1st record.
>>> Storm was 20 seconds faster in 1 millionth record.
>>>
>>> *Test 2*
>>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>>> from Kafka) and 3 bolts (Transformation and sink)
>>> Flink. Operator level parallelism not set. Task Parallelism is set as 4.
>>> Task slot is 1 per core. So all cores is used.
>>>
>>> Storm was 180 milliseconds faster in 1st record.
>>> Storm was 25 seconds faster in 1 millionth record.
>>>
>>> *Observations here*
>>> 1) Increasing Parallelism did not increase the performance in Flink
>>> rather it 

Flink 1.11 throws Unrecognized field "error_code"

2020-07-16 Thread Lian Jiang
Hi,

I am using java 1.8 and Flink 1.11 by following
https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html
on my MAC Mojave 10.14.6. But "

./bin/flink run examples/streaming/WordCount.jar

" throw below error. These are the java versions that I tried:

 1.8.0_222, x86_64: "AdoptOpenJDK 8"
/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home
1.8.0_221, x86_64: "Java SE 8"
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home
1.8.0_181, x86_64: "Java SE 8"
/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home

They all behave the same. Any idea is highly appreciated!


Error:

Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "error_code" (class
org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody), not
marked as ignorable (one known property: "jobUrl"])
 at [Source: UNKNOWN; line: -1, column: -1] (through reference chain:
org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody["error_code"])
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1192)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:504)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4173)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2536)
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:382)


Re: Flink yarn session exception

2020-07-16 Thread 忝忝向仧
你可以看看lib里面的包跟官网的要求是不是一样的



发自我的iPhone


-- Original --
From: Rainie Li https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
 <
 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
 
 Best,
 Paul Lam

  2020年7月16日 15:46,Rainie Li 

CEP use case ?

2020-07-16 Thread Aissa Elaffani
Hello Guys,
I have some sensors  generating some data about their (température,
humidity, positioning , ...) and I want to apply some rules (simple
conditions, if température>25, ...), in order to define if the sensor is on
"Normal" status or "Alerte" status. Do i need to use flink CEP, or just the
DataStream Api is suffisant to define the status of each sensor.
Sorry for disturbing you ! I hope someone can help me with that.
Thank you guys.
Best
Aissa


Re: Flink Pojo Serialization for Map Values

2020-07-16 Thread Theo Diefenthal
hi Krzysztof,

That's why my goal is to always set 
   env.getConfig().disableGenericTypes();
in my streaming jobs. This way, you will receive an early crash if GenericTypes 
are used somewhere. (They are bad for the performance so I try to avoid them 
everywhere).

Sadly, if you build up streaming jobs based on kafka, you must have Flink 
1.10.1 or flink 1.11 to be able to have this setting set. ( 
https://issues.apache.org/jira/browse/FLINK-15904 ). 

As my project is still on Flink 1.9, I currently have an e2e-like test with 
"disableGenericTypes", which differs from the real job only by not using kafka 
consumer and instead have another input, so I can make sure the remaining part 
of my pipeline has no generic types within. 

Best regards
Theo



- Ursprüngliche Mail -
Von: "KristoffSC" 
An: "user" 
Gesendet: Donnerstag, 16. Juli 2020 20:57:09
Betreff: Re: Flink Pojo Serialization for Map Values

Theo,
thank you for clarification and code examples. 
I was actually suspectign that this is becase the Java type erasure.s

The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.




Beam flink runner job not keeping up with input rate after downscaling

2020-07-16 Thread Kathula, Sandeep
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It 
is a stateful application which uses RocksDB as state store. We are using 
timers and Beam’s value state and bag state (which is same as List state of 
Flink). We are doing incremental checkpointing. With initial parallelism of 50, 
our application is able to process up to 50,000 records per second. After a 
week, we took a savepoint and restarted from savepoint with the parallelism of 
18. We are seeing that our application is only able to process 7000 records per 
second. Records processed per task manager was almost half of what is used to 
process previously with 50 task managers.

We didn’t give any maxParallelism in our Beam application but found from logs 
that maxParallelism has been set to -1. Also Beam’s doc for Flink runner 
mentiones by default maxParallelism is -1 
https://beam.apache.org/documentation/runners/flink/

But this Flink doc 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html 
mentions that by default maxParallelism is set to operatorParallelism + 
(operatorParallelism / 2) which would be 75 in our case.

I didn’t get how maxParallelism is set (when giving maxParallelism as -1 to 
Beam’s Flink runner). I highly doubt more key groups is causing this 
performance degradation?

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula




Re: Question on Pattern Matching

2020-07-16 Thread Chesnay Schepler
Have you read this part of the documentation 
?


From what I understand, it provides you hooks for processing 
matched/timed out patterns.



On 16/07/2020 20:23, Basanth Gowda wrote:

Hello,

We have a use case where we want to know when a certain pattern 
doesn't complete within a given time frame.


For Example A -> B -> C -> D (needs to complete in 10 minutes)

Now with Flink if event D doesn't happen in 10 minutes, the pattern is 
discarded and we can get notified. We also want to track how many of 
them completed (even if they meet SLA). How do we achieve this with 
FLINK CEP or other mechanisms?


thank you,
Basanth





Re: Using md5 hash while sinking files to s3

2020-07-16 Thread Chesnay Schepler
I only quickly skimmed the Hadoop docs and found this (although it is 
not documented very well I might add). If this does not do the trick, 
I'd suggest to reach out to the Hadoop project, since we're using their 
S3 filesystem.


On 16/07/2020 19:32, nikita Balakrishnan wrote:

Hey Chesnay,

Thank you for getting back with that! I tried setting that too, it 
still gives me the same exception. Is there something else that I'm 
missing?
I also have 
fs.s3a.bucket..server-side-encryption-algorithm=SSE-KMS 
and fs.s3a.bucket..server-side-encryption.key set.


Is there no need to set the md5 hash value manually while sinking? The 
fs.s3a.etag.checksum.enabled: true will do it for me? And Do I need to 
specify anywhere that we have to use md5 hashing?



On Thu, Jul 16, 2020 at 12:04 AM Chesnay Schepler > wrote:


Please try configuring :

fs.s3a.etag.checksum.enabled: true


On 16/07/2020 03:11, nikita Balakrishnan wrote:

Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at

org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at

org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at

org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at

org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: 

Re: backup configuration in Flink doc

2020-07-16 Thread Chesnay Schepler
They should be public yes; I do not know what the "Backup" category is 
supposed to mean, and I suspect this was a WIP title.


On 16/07/2020 18:01, Steven Wu wrote:


The configuration page has this "backup" section. Can I assume that 
they are public interfaces? The name "backup" is a little confusing to 
me. There are some important pipeline and execution checkpointing 
configs here.

https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#backup

Thanks,
Steven





Re: Flink Pojo Serialization for Map Values

2020-07-16 Thread KristoffSC
Theo,
thank you for clarification and code examples. 
I was actually suspectign that this is becase the Java type erasure.s

The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Question on Pattern Matching

2020-07-16 Thread Basanth Gowda
Hello,

We have a use case where we want to know when a certain pattern doesn't
complete within a given time frame.

For Example A -> B -> C -> D (needs to complete in 10 minutes)

Now with Flink if event D doesn't happen in 10 minutes, the pattern is
discarded and we can get notified. We also want to track how many of them
completed (even if they meet SLA). How do we achieve this with FLINK CEP or
other mechanisms?

thank you,
Basanth


Re: Hadoop FS when running standalone

2020-07-16 Thread Lorenzo Nicora
Thanks Alessandro,

I think I solved it.
I cannot set any HADOOP_HOME as I have no Hadoop installed on the machine
running my tests.
But adding *org.apache.flink:flink-shaded-hadoop-2:2.8.3-10.0* as a compile
dependency to the Maven profile building the standalone version fixed the
issue.

Lorenzo


On Thu, 16 Jul 2020 at 15:35, Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hi Lorenzo,
> IIRC I had the same error message when trying to write snappified parquet
> on HDFS with a standalone fat jar.
>
> Flink could not "find" the hadoop native/binary libraries (specifically I
> think for me the issue was related to snappy), because my HADOOP_HOME was
> not (properly) set.
>
> I have never used S3 so I don't know if what I mentioned could be the
> problem here too, but worth checking.
>
> Best regards,
> Alessandro
>
> On Thu, 16 Jul 2020 at 12:59, Lorenzo Nicora 
> wrote:
>
>> Hi
>>
>> I need to run my streaming job as a *standalone* Java application, for
>> testing
>> The job uses the Hadoop S3 FS and I need to test it (not a unit test).
>>
>> The job works fine when deployed (I am using AWS Kinesis Data Analytics,
>> so Flink 1.8.2)
>>
>> I have *org.apache.flink:flink-s3-fs-hadoop* as a "compile" dependency.
>>
>> For running standalone, I have a Maven profile adding dependencies that
>> are normally provided (
>> *org.apache.flink:flink-java*,
>> *org.apache.flink:flink-streaming-java_2.11*,
>> *org.apache.flink:flink-statebackend-rocksdb_2.11*,
>> *org.apache.flink:flink-connector-filesystem_2.11*) but I keep getting
>> the error "Hadoop is not in the classpath/dependencies" and it does not
>> work.
>> I tried adding *org.apache.flink:flink-hadoop-fs* with no luck
>>
>> What dependencies am I missing?
>>
>> Cheers
>> Lorenzo
>>
>


Re: Using md5 hash while sinking files to s3

2020-07-16 Thread nikita Balakrishnan
Hey Chesnay,

Thank you for getting back with that! I tried setting that too, it still
gives me the same exception. Is there something else that I'm missing?
I also have fs.s3a.bucket..server-side-encryption-algorithm=SSE-KMS
and fs.s3a.bucket..server-side-encryption.key set.

Is there no need to set the md5 hash value manually while sinking? The
fs.s3a.etag.checksum.enabled:
true will do it for me? And Do I need to specify anywhere that we have to
use md5 hashing?


On Thu, Jul 16, 2020 at 12:04 AM Chesnay Schepler 
wrote:

> Please try configuring :
>
> fs.s3a.etag.checksum.enabled: true
>
>
> On 16/07/2020 03:11, nikita Balakrishnan wrote:
>
> Hello team,
>
> I’m developing a system where we are trying to sink to an immutable s3
> bucket. This bucket has server side encryption set as KMS. The DataStream
> sink works perfectly fine when I don’t use the immutable bucket but when I
> use an immutable bucket, I get exceptions regarding multipart upload
> failures. It says we need to enable md5 hashing for the put object to work.
>
> Here’s the stack trace:
>
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> java.io.IOException: Uploading parts failed
> ... 11 common frames omitted
> Caused by: java.io.IOException: Uploading parts failed
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
> at
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
> ... 10 common frames omitted
> Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
> raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
> com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
> is required for Put Part requests with Object Lock parameters (Service:
> Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
> S3 Extended Request ID: ), S3 Extended Request ID: xx
> :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
> with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
> Code: InvalidRequest; Request ID: ; S3 Extended Request ID: )
> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> at 

Re: flink 1.11任务提交的问题

2020-07-16 Thread Leonard Xu
Hi,

我理解目前好像做不到, cc: godfrey 大佬看看

祝好,
Leonard Xu

> 在 2020年7月16日,23:08,sunfulin  写道:
> 
> hi,
> 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql 
> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> 通过StreamExecutionEnvironment.execute提交,yarn 
> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。



backup configuration in Flink doc

2020-07-16 Thread Steven Wu
The configuration page has this "backup" section. Can I assume that they
are public interfaces? The name "backup" is a little confusing to me. There
are some important pipeline and execution checkpointing configs here.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#backup

Thanks,
Steven


Flink Jobs are failing for running testcases when trying to build in Jenkins server

2020-07-16 Thread bujjirahul45
Hi,

I am trying to build flink job in Jenkins server and when its running the
testcases its giving me below i am doing a simple pattern validation, where
i am testing data against a set of patterns its build fine in local with
gradle 6.3 but trying to build in Jenkins server its giving below is stack
trace please suggest me what i am doing wrong

SignalPatternDefinitionMatchingTest >
testCompareInputAndOutputDataForInValidSignal() FAILED
java.lang.Exception: Could not create actor system
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:278)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:164)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRpcService(AkkaRpcServiceUtils.java:126)
at
org.apache.flink.runtime.metrics.util.MetricUtils.startMetricsRpcService(MetricUtils.java:139)
at
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:286)
at
org.apache.flink.client.deployment.executors.LocalExecutor.startMiniCluster(LocalExecutor.java:117)
at
org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:63)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at
com.myorg.pattern.service.TestPatternProcessingService.getInValidSignalDataStreamOutput(TestPatternProcessingService.java:140)
at
com.myorg.pattern.pattern.SignalPatternDefinitionMatchingTest.testCompareInputAndOutputDataForInValidSignal(SignalPatternDefinitionMatchingTest.java:24)
at
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
at
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at

flink 1.11任务提交的问题

2020-07-16 Thread sunfulin
hi,
请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql 
dml提交(executeSQL执行),又通过DataStream.addSink来写出,
通过StreamExecutionEnvironment.execute提交,yarn 
per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。

UnsupportedOperatorException with TensorFlow on checkpointing

2020-07-16 Thread Sung Gon Yi
Hi,

Following codes have a UnsupportedOperatorException on checkpointing (every 
time).
Could you suggest any solution?

Example code:
A.java
--
public class A extends RichWindowFunction 
{
private transient MapState state;

@Override
public void apply(String key, GlobalWindow window, Iterable 
elements, Collector out) {
String k; 
...
B b = state.get(k); // error reported point (eg. 58th line)
...
}

…
}

B.java
-
...
import org.tensorflow.SavedModelBundle;
import org.tensorflow.framework.*;

public class B {
...
private SavedModelBundle savedModel; 
private MetaGraph metaGraph;

public void initialize(String path, ConfigProto config, String... tags) 
throws Exception {
...
this.savedModel = 
SavedModelBundle.loader(path).withTags(tags).withConfigProto(config.toByteArray()).load();
this.metaGraph = MetaGraph.parseFrom(this.saveModel.metaGraphDef());
...
}
}

Error  logs:
--
2020-07-07 15:55:30,144 INFO  org.apache.flink.runtime.taskmanager.Task 
- Window(GlobalWindows(), CountTrigger, TimeCountEvictor, 
SequentialOperator) -> asset_score-evaluate -> Sink: Unnamed (1/1) 
(96fcb5ceb56bc95177b27697cc264edf) switched from RUNNING to FAILED.
om.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
field_ (com.google.protobuf.DescriptorProtos$DescriptorProto)
proto (com.google.protobuf.Descriptors$Descriptor)
messageType (com.google.protobuf.Descriptors$FieldDescriptor)
fields (com.google.protobuf.Descriptors$Descriptor)
messageTypes (com.google.protobuf.Descriptors$FileDescriptor)
dependencies (com.google.protobuf.Descriptors$FileDescriptor)
dependencies (com.google.protobuf.Descriptors$FileDescriptor)
file (com.google.protobuf.Descriptors$FieldDescriptor)
fields (com.google.protobuf.Descriptors$Descriptor)
containingType (com.google.protobuf.Descriptors$Descriptor)
descriptor (com.google.protobuf.MapEntry$Metadata)
metadata (com.google.protobuf.MapEntry)
defaultEntry (com.google.protobuf.MapField$ImmutableMessageConverter)
converter (com.google.protobuf.MapField)
deviceCount_ (org.tensorflow.framework.ConfigProto)
…
metaGraph_(org.tensorflow.framework.MetaGraphDef)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:368)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:368)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:368)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at 

????

2020-07-16 Thread ????


Re: Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread 李佳宸
好的,谢谢~~~

JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:

> hi
> 需要开启checkpoint
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月16日 18:03,李佳宸 写道:
> 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> 批量的hive写入,流环境的读取是正常的。
>
> 附代码,很简短:
>
> public class KafkaToHiveStreaming {
>public static void main(String[] arg) throws Exception{
>StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>String name= "myhive";
>String defaultDatabase = "default";
>String hiveConfDir =
> "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> path
>String version = "3.1.2";
>
>HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>bsTableEnv.registerCatalog("myhive", hive);
>bsTableEnv.useCatalog("myhive");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>bsTableEnv.executeSql("CREATE TABLE topic_products (" +
>"  id BIGINT ," +
>"  order_id STRING," +
>"  amount DECIMAL(10, 2)," +
>"  create_time TIMESTAMP " +
>") WITH (" +
>" 'connector' = 'kafka'," +
>" 'topic' = 'order.test'," +
>" 'properties.bootstrap.servers' = 'localhost:9092'," +
>" 'properties.group.id' = 'testGroup'," +
>" 'scan.startup.mode' = 'earliest-offset', " +
>" 'format' = 'json'  " +
>")");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
>"  id BIGINT ," +
>"  order_id STRING," +
>"  amount DECIMAL(10, 2)" +
>"  )");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>bsTableEnv.executeSql("CREATE TABLE print_table WITH
> ('connector' = 'print')" +
>"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> ALL)");
>
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT
> " +
>"id, " +
>"order_id, " +
>"amount " +
>"FROM topic_products");
>
>Table table1 = bsTableEnv.from("hive_sink_table_streaming");
>table1.executeInsert("print_table");
>}
> }
>


Re: Performance test Flink vs Storm

2020-07-16 Thread Prasanna kumar
Xintong Song,


   - Which version of Flink is used?*1.10*
   - Which deployment mode is used? *Standalone*
   - Which cluster mode is used? *Job*
   - Do you mean you have a 4core16gb node for each task manager, and each
   task manager has 4 slots? *Yeah*. *There are totally 3 taskmanagers in
   the cluster.  2TMs are t2.medium machine 2 core 4 gb per machine. 1 slot
   per core. 1TM is t2.large 4core 16gb . 4slots in the machine. There were
   other jobs running in the t2.medium TMs. T2.large machine is where the
   performance testing job was running. *
   - Sounds like you are running a streaming job without using any state.
   Have you tuned the managed memory fraction
   (`taskmanager.memory.managed.fraction`) to zero as suggested in the
   document[1]?  *No i have not set the
   taskmanager.memory.network.fraction to 0. I had set Checkpoint to use the
   Job manager backend. *
   - *The CPU maximum spike i spotted was 40%. *

*Between i did some latest test only on t2.medium machine with 2 slots per
core. 1million records with 10k/s ingestion rate. Parallelism was 1. *
*I added rebalance to the inputstream.   ex: *inputStream.rebalance().map()
*I was able to get latency in the range 130ms - 2sec.*

Let me also know if there are more things to consider here.

Thanks
Prasanna.

On Thu, Jul 16, 2020 at 4:04 PM Xintong Song  wrote:

> Hi Prasanna,
>
> Trying to understand how Flink is deployed.
>
>- Which version of Flink is used?
>- Which deployment mode is used? (Standalone/Kubernetes/Yarn/Mesos)
>- Which cluster mode is used? (Job/Session)
>- Do you mean you have a 4core16gb node for each task manager, and
>each task manager has 4 slots?
>- Sounds like you are running a streaming job without using any state.
>Have you tuned the managed memory fraction
>(`taskmanager.memory.managed.fraction`) to zero as suggested in the
>document[1]?
>
> When running a stateless job or using a heap state backend
>> (MemoryStateBackend or FsStateBackend), set managed memory to zero.
>>
>
> I can see a few potential problems.
>
>- Managed memory is probably not configured. That means a significant
>fraction of memory is unused.
>- It sounds like the CPU processing time is not the bottleneck. Thus
>increasing the parallelism will not give you better performance, but will
>on the other hand increase the overhead load on the task manager.
>
> Also pulled in Becket Qin, who is the expert of Kafka connectors. Since
> you have observed lack of performance in reading from Kafka compared to
> Storm.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_tuning.html#heap-state-backend
>
> On Thu, Jul 16, 2020 at 10:35 AM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Hi
>>
>> Sending to you all separately as you answered one of my earlier query.
>>
>> Thanks,
>> Prasanna.
>>
>>
>> -- Forwarded message -
>> From: Prasanna kumar 
>> Date: Wed 15 Jul, 2020, 23:27
>> Subject: Performance test Flink vs Storm
>> To: , user 
>>
>>
>> Hi,
>>
>> We are testing flink and storm for our streaming pipelines on various
>> features.
>>
>> In terms of Latency,i see the flink comes up short on storm even if more
>> CPU is given to it. Will Explain in detail.
>>
>> *Machine*. t2.large 4 core 16 gb. is used for Used for flink task
>> manager and storm supervisor node.
>> *Kafka Partitions* 4
>> *Messages tested:* 1million
>> *Load* : 50k/sec
>>
>> *Scenario*:
>> Read from Kafka -> Transform (Map to a different JSON format) - > Write
>> to a Kafka topic.
>>
>> *Test 1*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink) .
>> Flink. Operator level parallelism not set. Task Parallelism is set as 1.
>> Task slot is 1 per core.
>>
>> Storm was 130 milliseconds faster in 1st record.
>> Storm was 20 seconds faster in 1 millionth record.
>>
>> *Test 2*
>> Storm Parallelism is set as 1. There are four processes. 1 Spout (Read
>> from Kafka) and 3 bolts (Transformation and sink)
>> Flink. Operator level parallelism not set. Task Parallelism is set as 4.
>> Task slot is 1 per core. So all cores is used.
>>
>> Storm was 180 milliseconds faster in 1st record.
>> Storm was 25 seconds faster in 1 millionth record.
>>
>> *Observations here*
>> 1) Increasing Parallelism did not increase the performance in Flink
>> rather it became 50ms to 5s slower.
>> 2) Flink is slower in Reading from Kafka compared to storm. Thats where
>> the bulk of the latency is.  for the millionth record its 19-24 seconds
>> slower.
>> 3) Once message is read, flink takes lesser time to transform and write
>> to kafka compared to storm.
>>
>> *Other Flink Config*
>> jobmanager.heap.size: 1024m
>>
>> taskmanager.memory.process.size: 1568m
>>
>> *How do we improve the latency ? *
>> *Why does latency becomes worse when parallelism is increased and matched
>> to 

Re: Hadoop FS when running standalone

2020-07-16 Thread Alessandro Solimando
Hi Lorenzo,
IIRC I had the same error message when trying to write snappified parquet
on HDFS with a standalone fat jar.

Flink could not "find" the hadoop native/binary libraries (specifically I
think for me the issue was related to snappy), because my HADOOP_HOME was
not (properly) set.

I have never used S3 so I don't know if what I mentioned could be the
problem here too, but worth checking.

Best regards,
Alessandro

On Thu, 16 Jul 2020 at 12:59, Lorenzo Nicora 
wrote:

> Hi
>
> I need to run my streaming job as a *standalone* Java application, for
> testing
> The job uses the Hadoop S3 FS and I need to test it (not a unit test).
>
> The job works fine when deployed (I am using AWS Kinesis Data Analytics,
> so Flink 1.8.2)
>
> I have *org.apache.flink:flink-s3-fs-hadoop* as a "compile" dependency.
>
> For running standalone, I have a Maven profile adding dependencies that
> are normally provided (
> *org.apache.flink:flink-java*,
> *org.apache.flink:flink-streaming-java_2.11*,
> *org.apache.flink:flink-statebackend-rocksdb_2.11*,
> *org.apache.flink:flink-connector-filesystem_2.11*) but I keep getting
> the error "Hadoop is not in the classpath/dependencies" and it does not
> work.
> I tried adding *org.apache.flink:flink-hadoop-fs* with no luck
>
> What dependencies am I missing?
>
> Cheers
> Lorenzo
>


Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn

 谢谢,我理解了。



wangl...@geekplus.com.cn 

Sender: Harold.Miao
Send Time: 2020-07-16 19:33
Receiver: user-zh
Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
 
private static  T findSingleInternal(
  Class factoryClass,
  Map properties,
  Optional classLoader) {
 
   List tableFactories = discoverFactories(classLoader);
   List filtered = filter(tableFactories, factoryClass, properties);
 
   if (filtered.size() > 1) {
  throw new AmbiguousTableFactoryException(
 filtered,
 factoryClass,
 tableFactories,
 properties);
   } else {
  return filtered.get(0);
   }
}
 
private static List
discoverFactories(Optional classLoader) {
   try {
  List result = new LinkedList<>();
  ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
  ServiceLoader
 .load(TableFactory.class, cl)
 .iterator()
 .forEachRemaining(result::add);
  return result;
   } catch (ServiceConfigurationError e) {
  LOG.error("Could not load service provider for table factories.", e);
  throw new TableException("Could not load service provider for
table factories.", e);
   }
 
}
 
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午7:04写道:
 
>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>
 
 
-- 
 
Best Regards,
Harold Miao


AllwindowStream and RichReduceFunction

2020-07-16 Thread Flavio Pompermaier
Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but Flink
says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio


Re: flink 1.11 checkpoint使用

2020-07-16 Thread godfrey he
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
debezium_source" 不能满足需求?

曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:

> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> 从checkpoint恢复以后,新来op=d的数据会删除失败
> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> 最大允许同时出现几个CheckPoint
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> 最小得间隔时间
> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
> 是否倾向于用CheckPoint做故障恢复
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> //
> 容忍多少次CheckPoint失败
> //Checkpoint文件清理策略
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> //Checkpoint外部文件路径
> env.setStateBackend(new FsStateBackend(new
> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> String sourceDDL = String.format(
> "CREATE TABLE debezium_source (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double" +
> ") WITH (" +
> " 'connector' = 'kafka-0.11'," +
> " 'topic' = '%s'," +
> " 'properties.bootstrap.servers' = '%s'," +
> " 'scan.startup.mode' = 'group-offsets'," +
> " 'format' = 'debezium-json'" +
> ")", "ddd", " 172.22.20.206:9092");
> String sinkDDL = "CREATE TABLE sink (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double," +
> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED "
> +
> ") WITH (" +
> " 'connector' = 'jdbc'," +
> " 'url' =
> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> " 'table-name' = 'products'," +
> " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> " 'username'='DataPip'," +
> " 'password'='DataPip'" +
> ")";
> String dml = "INSERT INTO sink SELECT  id,name ,description, weight
> FROM debezium_source GROUP BY id,name ,description, weight";
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> tEnv.executeSql(dml);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink 1.11 checkpoint使用

2020-07-16 Thread 曹武
我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败
重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
最大允许同时出现几个CheckPoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
最小得间隔时间
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
是否倾向于用CheckPoint做故障恢复
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); //
容忍多少次CheckPoint失败
//Checkpoint文件清理策略
   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//Checkpoint外部文件路径
env.setStateBackend(new FsStateBackend(new
URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double" +
") WITH (" +
" 'connector' = 'kafka-0.11'," +
" 'topic' = '%s'," +
" 'properties.bootstrap.servers' = '%s'," +
" 'scan.startup.mode' = 'group-offsets'," +
" 'format' = 'debezium-json'" +
")", "ddd", " 172.22.20.206:9092");
String sinkDDL = "CREATE TABLE sink (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double," +
" PRIMARY KEY (id,name, description,weight) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' =
'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
" 'table-name' = 'products'," +
" 'driver'= 'com.mysql.cj.jdbc.Driver'," +
" 'username'='DataPip'," +
" 'password'='DataPip'" +
")";
String dml = "INSERT INTO sink SELECT  id,name ,description, weight
FROM debezium_source GROUP BY id,name ,description, weight";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
tEnv.executeSql(dml);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Byte arrays in Avro

2020-07-16 Thread Timo Walther
I further investigated this issue. We are analyzing the class as a POJO 
in another step here which produces the warning:


https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java#L71

However, the serializer is definitely the `AvroSerializer` if the type 
information is `AvroTypeInfo`. You can check that via `dataStream.getType`.


I hope this helps.

Regards,
Timo

On 16.07.20 14:28, Timo Walther wrote:

Hi Lasse,

are you using Avro specific records? A look into the code shows that the 
warnings in the log are generated after the Avro check:


https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java#L1741 



Somehow your Avro object is not recognized correctly?

Regards,
Timo

On 16.07.20 13:28, Lasse Nedergaard wrote:

Hi.

We have some Avro objects and some of them contain the primitive data 
type bytes and it's translated into java.nio.ByteBuffer in the Avro 
objects. When using our Avro object we get these warnings:


org.apache.flink.api.java.typeutils.TypeExtractor [] - class 
java.nio.ByteBuffer does not contain a getter for field hb
org.apache.flink.api.java.typeutils.TypeExtractor [] - class 
java.nio.ByteBuffer does not contain a setter for field hb
org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class 
java.nio.ByteBuffer cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" 
for details of the effect on performance.


and it's correct that ByteBuffer doesn't contain getter and setter for 
hb.


Flink documentation said "Note that Flink is automatically serializing 
POJOs generated by Avro with the Avro serializer.", but when I debug 
it looks like it fails back to generic type for the byte buffer and it 
therefore make sense with the warnings.


I want to ensure we are running as effectively as possible.

So my questions are:
1. What is the most optimal way to transport byte arrays in Avro in 
Flink.
2. Do Flink use Avro serializer for our Avro object when they contain 
ByteBuffer?



Thanks

Lasse Nedergaard






Re: Byte arrays in Avro

2020-07-16 Thread Timo Walther

Hi Lasse,

are you using Avro specific records? A look into the code shows that the 
warnings in the log are generated after the Avro check:


https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java#L1741

Somehow your Avro object is not recognized correctly?

Regards,
Timo

On 16.07.20 13:28, Lasse Nedergaard wrote:

Hi.

We have some Avro objects and some of them contain the primitive data 
type bytes and it's translated into java.nio.ByteBuffer in the Avro 
objects. When using our Avro object we get these warnings:


org.apache.flink.api.java.typeutils.TypeExtractor [] - class 
java.nio.ByteBuffer does not contain a getter for field hb
org.apache.flink.api.java.typeutils.TypeExtractor [] - class 
java.nio.ByteBuffer does not contain a setter for field hb
org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class 
java.nio.ByteBuffer cannot be used as a POJO type because not all fields 
are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of 
the effect on performance.


and it's correct that ByteBuffer doesn't contain getter and setter for hb.

Flink documentation said "Note that Flink is automatically serializing 
POJOs generated by Avro with the Avro serializer.", but when I debug it 
looks like it fails back to generic type for the byte buffer and it 
therefore make sense with the warnings.


I want to ensure we are running as effectively as possible.

So my questions are:
1. What is the most optimal way to transport byte arrays in Avro in Flink.
2. Do Flink use Avro serializer for our Avro object when they contain 
ByteBuffer?



Thanks

Lasse Nedergaard




Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Congxian Qiu
Hi Peihui

感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
非常感谢~

[1] https://gist.github.com/

Best,
Congxian


Peihui He  于2020年7月16日周四 下午5:54写道:

> Hi Yun,
>
> 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
> flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
>
> 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
>
> Peihui He  于2020年7月16日周四 下午5:26写道:
>
>> Hi Yun,
>>
>> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>>
>> Best wishes.
>>
>> Yun Tang  于2020年7月16日周四 下午5:04写道:
>>
>>> Hi Peihui
>>>
>>> Flink-1.10.1
>>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>>> 祝好
>>> 唐云
>>> 
>>> From: Peihui He 
>>> Sent: Thursday, July 16, 2020 16:15
>>> To: user-zh@flink.apache.org 
>>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>>
>>> Hi Yun,
>>>
>>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>>> 输入的特定的word抛出runtimeexception 使task
>>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>>>
>>> Caused by: java.nio.file.NoSuchFileException:
>>>
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>>> ->
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>>
>>> 情况和@chenxyz 类似。
>>>
>>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>>>
>>> 换成1.10.1 就可以了
>>>
>>> Best wishes.
>>>
>>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>>>
>>> > Hi Robin
>>> >
>>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>>> >
>>> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>>> >
>>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
>>> cause,还请在日志中找一下无法恢复的root
>>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>>> >
>>> >
>>> > [1]
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>>> > [2]
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>>> >
>>> > 祝好
>>> > 唐云
>>> >
>>> >
>>> > 
>>> > From: Robin Zhang 
>>> > Sent: Wednesday, July 15, 2020 16:23
>>> > To: user-zh@flink.apache.org 
>>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>> >
>>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>>> >
>>> > Best
>>> > Robin Zhang
>>> > 
>>> > From: Peihui He <[hidden email]>
>>> > Sent: Tuesday, July 14, 2020 10:42
>>> > To: [hidden email] <[hidden email]>
>>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>> >
>>> > hello,
>>> >
>>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>>> >
>>> >
>>> > Caused by: java.nio.file.NoSuchFileException:
>>> >
>>> >
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>>> > ->
>>> >
>>> >
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>> >
>>> > 配置和1.9.2 一样:
>>> > state.backend: rocksdb
>>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
>>> > state.backend.incremental: true
>>> >
>>> > 代码上都有
>>> >
>>> > env.enableCheckpointing(1);
>>> >
>>> >
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
>>> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>>> >
>>> >
>>> >   是1.10.0 需要做什么特别配置么?
>>> >
>>> >
>>> >
>>> > --
>>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>>> >
>>>
>>


回复:Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread JasonLee
hi
需要开启checkpoint


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月16日 18:03,李佳宸 写道:
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
   public static void main(String[] arg) throws Exception{
   StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
   EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
   StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
   String name= "myhive";
   String defaultDatabase = "default";
   String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
   String version = "3.1.2";

   HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
   bsTableEnv.registerCatalog("myhive", hive);
   bsTableEnv.useCatalog("myhive");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
   bsTableEnv.executeSql("CREATE TABLE topic_products (" +
   "  id BIGINT ," +
   "  order_id STRING," +
   "  amount DECIMAL(10, 2)," +
   "  create_time TIMESTAMP " +
   ") WITH (" +
   " 'connector' = 'kafka'," +
   " 'topic' = 'order.test'," +
   " 'properties.bootstrap.servers' = 'localhost:9092'," +
   " 'properties.group.id' = 'testGroup'," +
   " 'scan.startup.mode' = 'earliest-offset', " +
   " 'format' = 'json'  " +
   ")");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

   bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
   "  id BIGINT ," +
   "  order_id STRING," +
   "  amount DECIMAL(10, 2)" +
   "  )");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
   bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
   "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

   bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
   bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
   "id, " +
   "order_id, " +
   "amount " +
   "FROM topic_products");

   Table table1 = bsTableEnv.from("hive_sink_table_streaming");
   table1.executeInsert("print_table");
   }
}


Re: Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 Thread Congxian Qiu
Hi

你的图挂了,如果单纯想解决 jar 包冲突的问题,那么 maven shade plugin[1] 或许对你有用

[1]
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
Best,
Congxian


LakeShen  于2020年7月16日周四 下午6:03写道:

> Hi 社区,
>
> 我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.
>
> 现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:
> 
> org.apache.calcite.avatica
> avatica-core
> ${avatica.version}
> 
>
> 但是这个依赖其实在 flink-table 模块中,也有这个依赖:
> [image: image.png]
>
> 由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
> 包中,我在任务启动的时候,就会报:
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。
>
> 按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
> 目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。
>
> 请问怎么解决这个问题呢,非常期待你的回复。
>
> Best,
> LakeShen
>
>
>


Re: 退订

2020-07-16 Thread Leonard Xu
Hi,
是指取消订阅邮件吗?
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org  取消订阅来自 
user-zh@flink.apache.org 邮件组的邮件

邮件组的订阅管理,可以参考[1]

祝好,
Leonard Xu
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 
 

> 在 2020年7月16日,20:18,李国鹏  写道:
> 
> 退订



退订

2020-07-16 Thread 李国鹏
退订

Re: state无法从checkpoint中恢复

2020-07-16 Thread Congxian Qiu
Hi

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <1392427...@qq.com> 于2020年7月16日周四 下午6:16写道:

>
> 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //作业失败后不重启
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.getCheckpointConfig().setCheckpointTimeout(500);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setStateBackend(new
> RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
>   使用状态的代码private transient ListState
>
> @Override
> public void open(Configuration parameters) throws Exception {
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.minutes(30))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
>
> ListStateDescriptor ListStateDescriptor<("lastUserLogin", String.class);
> lastUserLogin.enableTimeToLive(ttlConfig);
> counts = getRuntimeContext().getListState(lastUserLogin);
> }
> 我重启了task managers 后。发现  counts  里面的数据都丢失了


Re: 【求助】flink打包到集群运行问题

2020-07-16 Thread Congxian Qiu
Hi

图片的文字太小了,可以看一下这个邮件[1],应该是一个问题,按理在 google 能够搜索到这个邮件列表的

[1]
http://apache-flink.147419.n8.nabble.com/Could-not-find-a-suitable-table-factory-for-TableSourceFactory-td3287.html
Best,
Congxian


徐粟  于2020年7月16日周四 下午8:02写道:

>
>
> 下面是被转发的邮件:
>
> *发件人: *徐粟 
> *主题: **【求助】flink打包到集群运行问题*
> *日期: *2020年7月16日 GMT+8 下午7:51:06
> *收件人: *user-zh@flink.apache.org
>
> hi ,please help me
> 我打包到集群之后,产生了如下图错误。
> flink版本是1.10.1 jar包是flnk-1.10.1-bin-scala_2.12.taz
> 命令在图片里面。thanks
>
>
>


Re: flink state问题

2020-07-16 Thread Congxian Qiu
Hi

你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html

Best,
Congxian


op <520075...@qq.com> 于2020年7月16日周四 下午7:16写道:

> 大家好
> 我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
>  .keyBy(_._1).process(new
> KeyedProcessFunction[String,(String,String),String]() {
>   var state:ValueState[BloomFilter[CharSequence]]= null
>   override def open(parameters: Configuration): Unit = {
> val stateDesc = new
> ValueStateDescriptor("state",TypeInformation.of(new
> TypeHint[BloomFilter[CharSequence]](){}))
> state = getRuntimeContext.getState(stateDesc)
>   }
>   override def processElement(value: (String, String), ctx:
> KeyedProcessFunction[String, (String, String), String]#Context, out:
> Collector[String]) = {
>
> var filter = state.value
> if(filter==null){
>   println("null filter")
>   filter=
> BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)}
> //val contains = filter.mightContain(value._2)
> if(!filter.mightContain(value._2)) {
>   filter.put(value._2)
>   state.update(filter)
>   out.collect(value._2)
>
> }
>
>   }
> })
> 通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊


[sql-client] 如何绕过交互式模式去做ddl

2020-07-16 Thread Harold.Miao
hi flink users

众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!


-- 

Best Regards,
Harold Miao


Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread Leonard Xu
Hi,

> 在 2020年7月16日,19:04,wangl...@geekplus.com.cn 写道:
> 
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 
> 代码没找到类似的关系映射配置。

你DDL中不是写了 ‘format’ = ‘debzium-json’ 吗?就是这里指明的。

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread Harold.Miao
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码

private static  T findSingleInternal(
  Class factoryClass,
  Map properties,
  Optional classLoader) {

   List tableFactories = discoverFactories(classLoader);
   List filtered = filter(tableFactories, factoryClass, properties);

   if (filtered.size() > 1) {
  throw new AmbiguousTableFactoryException(
 filtered,
 factoryClass,
 tableFactories,
 properties);
   } else {
  return filtered.get(0);
   }
}

private static List
discoverFactories(Optional classLoader) {
   try {
  List result = new LinkedList<>();
  ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
  ServiceLoader
 .load(TableFactory.class, cl)
 .iterator()
 .forEachRemaining(result::add);
  return result;
   } catch (ServiceConfigurationError e) {
  LOG.error("Could not load service provider for table factories.", e);
  throw new TableException("Could not load service provider for
table factories.", e);
   }

}


wangl...@geekplus.com.cn  于2020年7月16日周四 下午7:04写道:

>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>


-- 

Best Regards,
Harold Miao


Byte arrays in Avro

2020-07-16 Thread Lasse Nedergaard
Hi.

We have some Avro objects and some of them contain the primitive data
type bytes and it's translated into java.nio.ByteBuffer in the Avro
objects. When using our Avro object we get these warnings:

org.apache.flink.api.java.typeutils.TypeExtractor [] - class java.nio.
ByteBuffer does not contain a getter for field hb
org.apache.flink.api.java.typeutils.TypeExtractor [] - class java.nio.
ByteBuffer does not contain a setter for field hb
org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class java.nio.
ByteBuffer cannot be used as a POJO type because not all fields are valid
POJO fields, and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance.

and it's correct that ByteBuffer doesn't contain getter and setter for hb.

Flink documentation said "Note that Flink is automatically serializing
POJOs generated by Avro with the Avro serializer.", but when I debug it
looks like it fails back to generic type for the byte buffer and it
therefore make sense with the warnings.

I want to ensure we are running as effectively as possible.

So my questions are:
1. What is the most optimal way to transport byte arrays in Avro in Flink.
2. Do Flink use Avro serializer for our Avro object when they contain
ByteBuffer?


Thanks

Lasse Nedergaard


flink state????

2020-07-16 Thread op
??
??bloomfilter
 .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() 
{
  var state:ValueState[BloomFilter[CharSequence]]= null
  override def open(parameters: Configuration): Unit = {
val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new 
TypeHint[BloomFilter[CharSequence]](){}))
state = getRuntimeContext.getState(stateDesc)
  }
  override def processElement(value: (String, String), ctx: 
KeyedProcessFunction[String, (String, String), String]#Context, out: 
Collector[String]) = {

var filter = state.value
if(filter==null){
  println("null filter")
  filter=  
BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)}
//val contains = filter.mightContain(value._2)
if(!filter.mightContain(value._2)) {
  filter.put(value._2)
  state.update(filter)
  out.collect(value._2)

}

  }
})
??savepoint??state??bloomfilternull??

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn

我在 
flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 找到了 SPI 的配置:

org.apache.flink.formats.json.JsonFileSystemFormatFactory
org.apache.flink.formats.json.JsonFormatFactory
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
org.apache.flink.formats.json.canal.CanalJsonFormatFactory

还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 
代码没找到类似的关系映射配置。


谢谢,
王磊



wangl...@geekplus.com.cn 

 
Sender: godfrey he
Send Time: 2020-07-16 16:38
Receiver: user-zh
Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
 
Best,
Godfrey
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:
 
> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


Hadoop FS when running standalone

2020-07-16 Thread Lorenzo Nicora
Hi

I need to run my streaming job as a *standalone* Java application, for
testing
The job uses the Hadoop S3 FS and I need to test it (not a unit test).

The job works fine when deployed (I am using AWS Kinesis Data Analytics, so
Flink 1.8.2)

I have *org.apache.flink:flink-s3-fs-hadoop* as a "compile" dependency.

For running standalone, I have a Maven profile adding dependencies that are
normally provided (
*org.apache.flink:flink-java*, *org.apache.flink:flink-streaming-java_2.11*,
*org.apache.flink:flink-statebackend-rocksdb_2.11*,
*org.apache.flink:flink-connector-filesystem_2.11*) but I keep getting the
error "Hadoop is not in the classpath/dependencies" and it does not work.
I tried adding *org.apache.flink:flink-hadoop-fs* with no luck

What dependencies am I missing?

Cheers
Lorenzo


flink1.10升级到flink1.11 jar冲突

2020-07-16 Thread xyq
hello  大家好
 我在flink由1.10升级到1.11过程中遇到如下问题,请问是哪个包冲突了(本地可跑,上测试环境就报错),谢谢:


Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:721)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:187)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 15 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:718)



state??????checkpoint??????

2020-07-16 Thread sun
env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new 
RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
??private transient ListState

Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 Thread LakeShen
Hi 社区,

我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.

现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:

org.apache.calcite.avatica
avatica-core
${avatica.version}


但是这个依赖其实在 flink-table 模块中,也有这个依赖:
[image: image.png]

由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
包中,我在任务启动的时候,就会报:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。

按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。

请问怎么解决这个问题呢,非常期待你的回复。

Best,
LakeShen


Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread 李佳宸
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
public static void main(String[] arg) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String name= "myhive";
String defaultDatabase = "default";
String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
String version = "3.1.2";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
bsTableEnv.registerCatalog("myhive", hive);
bsTableEnv.useCatalog("myhive");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE topic_products (" +
"  id BIGINT ," +
"  order_id STRING," +
"  amount DECIMAL(10, 2)," +
"  create_time TIMESTAMP " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order.test'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'json'  " +
")");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
"  id BIGINT ," +
"  order_id STRING," +
"  amount DECIMAL(10, 2)" +
"  )");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
"id, " +
"order_id, " +
"amount " +
"FROM topic_products");

Table table1 = bsTableEnv.from("hive_sink_table_streaming");
table1.executeInsert("print_table");
}
}


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Yun,

我这边测试需要在集群上跑的,本地idea跑是没有问题的。
flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。

附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。

Peihui He  于2020年7月16日周四 下午5:26写道:

> Hi Yun,
>
> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>
> Best wishes.
>
> Yun Tang  于2020年7月16日周四 下午5:04写道:
>
>> Hi Peihui
>>
>> Flink-1.10.1
>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>>
>>
>> [1]
>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>> 祝好
>> 唐云
>> 
>> From: Peihui He 
>> Sent: Thursday, July 16, 2020 16:15
>> To: user-zh@flink.apache.org 
>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>
>> Hi Yun,
>>
>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>> 输入的特定的word抛出runtimeexception 使task
>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>>
>> Caused by: java.nio.file.NoSuchFileException:
>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> ->
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>
>> 情况和@chenxyz 类似。
>>
>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>>
>> 换成1.10.1 就可以了
>>
>> Best wishes.
>>
>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>>
>> > Hi Robin
>> >
>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>> > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>> >
>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>> >
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>> > [2]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> >
>> > 祝好
>> > 唐云
>> >
>> >
>> > 
>> > From: Robin Zhang 
>> > Sent: Wednesday, July 15, 2020 16:23
>> > To: user-zh@flink.apache.org 
>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >
>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>> >
>> > Best
>> > Robin Zhang
>> > 
>> > From: Peihui He <[hidden email]>
>> > Sent: Tuesday, July 14, 2020 10:42
>> > To: [hidden email] <[hidden email]>
>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >
>> > hello,
>> >
>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>> >
>> >
>> > Caused by: java.nio.file.NoSuchFileException:
>> >
>> >
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> > ->
>> >
>> >
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>> >
>> > 配置和1.9.2 一样:
>> > state.backend: rocksdb
>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
>> > state.backend.incremental: true
>> >
>> > 代码上都有
>> >
>> > env.enableCheckpointing(1);
>> >
>> >
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
>> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>> >
>> >
>> >   是1.10.0 需要做什么特别配置么?
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >
>>
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Yun,

作业没有开启local recovery, 我这边测试1.10.0是必现的。

Best wishes.

Yun Tang  于2020年7月16日周四 下午5:04写道:

> Hi Peihui
>
> Flink-1.10.1
> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>
>
> [1]
> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
> 祝好
> 唐云
> 
> From: Peihui He 
> Sent: Thursday, July 16, 2020 16:15
> To: user-zh@flink.apache.org 
> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> Hi Yun,
>
> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
> 输入的特定的word抛出runtimeexception 使task
> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 情况和@chenxyz 类似。
>
> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>
> 换成1.10.1 就可以了
>
> Best wishes.
>
> Yun Tang  于2020年7月15日周三 下午4:35写道:
>
> > Hi Robin
> >
> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
> >
> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> >
> > 祝好
> > 唐云
> >
> >
> > 
> > From: Robin Zhang 
> > Sent: Wednesday, July 15, 2020 16:23
> > To: user-zh@flink.apache.org 
> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >
> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
> >
> > Best
> > Robin Zhang
> > 
> > From: Peihui He <[hidden email]>
> > Sent: Tuesday, July 14, 2020 10:42
> > To: [hidden email] <[hidden email]>
> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >
> > hello,
> >
> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >
> >
> > Caused by: java.nio.file.NoSuchFileException:
> >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> > ->
> >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >
> > 配置和1.9.2 一样:
> > state.backend: rocksdb
> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> > state.backend.incremental: true
> >
> > 代码上都有
> >
> > env.enableCheckpointing(1);
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >
> >
> >   是1.10.0 需要做什么特别配置么?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: Is there a way to access the number of 'keys' that have been used for partioning?

2020-07-16 Thread orionemail
Thanks for the response, I thought as much.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

‐‐‐ Original Message ‐‐‐
On Wednesday, 15 July 2020 17:12, Chesnay Schepler  wrote:

> This information is not readily available; in fact Flink itself doesn't know 
> how many keys there are at any point.
> You'd have to calculate it yourself.
>
> On 15/07/2020 17:11, orionemail wrote:
>
>> Hi,
>>
>> I need to query the number of keys that a stream has been split by, is there 
>> a way to do this?
>>
>> Thanks,
>>
>> O

来自李国鹏的邮件

2020-07-16 Thread 李国鹏
退订

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Yun Tang
Hi Peihui

Flink-1.10.1 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?


[1] 
https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
祝好
唐云

From: Peihui He 
Sent: Thursday, July 16, 2020 16:15
To: user-zh@flink.apache.org 
Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

Hi Yun,

不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
输入的特定的word抛出runtimeexception 使task
失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报

Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

情况和@chenxyz 类似。
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html

换成1.10.1 就可以了

Best wishes.

Yun Tang  于2020年7月15日周三 下午4:35写道:

> Hi Robin
>
> 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>
> 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
> cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>
> 祝好
> 唐云
>
>
> 
> From: Robin Zhang 
> Sent: Wednesday, July 15, 2020 16:23
> To: user-zh@flink.apache.org 
> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>
> Best
> Robin Zhang
> 
> From: Peihui He <[hidden email]>
> Sent: Tuesday, July 14, 2020 10:42
> To: [hidden email] <[hidden email]>
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread godfrey he
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

Best,
Godfrey

wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:

> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


Re:Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread Zhou Zach
nice, 可以不用看Command-Line Interface的文档了

















在 2020-07-16 16:16:00,"xiao cai"  写道:
>可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> 原始邮件 
>发件人: Zhou Zach
>收件人: user-zh
>发送时间: 2020年7月16日(周四) 15:28
>主题: Re:回复:flink1.11 set yarn slots failed
>
>
>-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
> 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
>> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
>list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
>yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
>/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
>-Djobmanager.memory.process.size=1024m \ 
>-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
>发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
> of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
>Slots的值,现在每次提交作业都是0


flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread xiao cai
可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 原始邮件 
发件人: Zhou Zach
收件人: user-zh
发送时间: 2020年7月16日(周四) 15:28
主题: Re:回复:flink1.11 set yarn slots failed


-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
-Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0

Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread xiao cai
可以看这里


 原始邮件 
发件人: Zhou Zach
收件人: user-zh
发送时间: 2020年7月16日(周四) 15:28
主题: Re:回复:flink1.11 set yarn slots failed


-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
-Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Yun,

不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
输入的特定的word抛出runtimeexception 使task
失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报

Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

情况和@chenxyz 类似。
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html

换成1.10.1 就可以了

Best wishes.

Yun Tang  于2020年7月15日周三 下午4:35写道:

> Hi Robin
>
> 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>
> 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
> cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>
> 祝好
> 唐云
>
>
> 
> From: Robin Zhang 
> Sent: Wednesday, July 15, 2020 16:23
> To: user-zh@flink.apache.org 
> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>
> Best
> Robin Zhang
> 
> From: Peihui He <[hidden email]>
> Sent: Tuesday, July 14, 2020 10:42
> To: [hidden email] <[hidden email]>
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink yarn session exception

2020-07-16 Thread Paul Lam
日志里说得比较清楚了,classpath 里没有 Hadoop 的 lib。可以参考这个文档 [1] 来配置你的环境。

1. 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
 

Best,
Paul Lam

> 2020年7月16日 15:46,Rainie Li  写道:
> 
> 大佬们好,我是flink新手,正在用flink 1.9.1
> Flink APP cannot run, APP log error,  想求教一下会是什么原因造成的,多谢
> 
> 2020-06-16 17:06:21,921 WARN  org.apache.flink.client.cli.CliFrontend
>- Could not load CLI class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> java.lang.NoClassDefFoundError:
> org/apache/hadoop/yarn/exceptions/YarnException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.yarn.exceptions.YarnException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 5 more
> 2020-06-16 17:06:21,980 INFO  org.apache.flink.core.fs.FileSystem
>- Hadoop is not in the classpath/dependencies. The extended
> set of supported File Systems via Hadoop is not available.



Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-16 Thread Danny Chan
I suspect there are some inconsistency in the nullability of the whole record 
field, can you compare the 2 schema and see the diff ? For a table, you can get 
the TableSchema first and print it out.

Best,
Danny Chan
在 2020年7月16日 +0800 AM10:56,Leonard Xu ,写道:
> Hi, Jim
>
> Could you post error message in text that contains the entire schema of query 
> and sink? I doubt there are some  fields type were mismatched.
>
> Best,
> Leonard Xu
>
> > 在 2020年7月16日,10:29,Jim Chen  写道:
> >
> > Hi,
> >   I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, 
> > report an error like validateSchemaAndApplyImplicitCast. Means that the 
> > Query Schema and Sink Schema are inconsistent.
> >   Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink 
> > Schema is Row(device_id). I don't know how to write in sql to be consistent 
> > with hbase's sink schema.
> >   I try to write sql like select device_id as rowkey, ROW( device_id as 
> > [cannot write as]  ) as f1
> >
> > error message as follow:
> > 
> >
> > sample code like:
> > HBase sink ddl:
> > String ddlSource = "CREATE TABLE 
> > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
> >                 "  rowkey STRING,\n" +
> >                 "  f1 ROW< \n" +
> >                 "        device_id STRING,\n" +
> >                 "        pass_id STRING,\n" +
> >                 "        first_date STRING,\n" +
> >                 "        first_channel_id STRING,\n" +
> >                 "        first_app_version STRING,\n" +
> >                 "        first_server_time STRING,\n" +
> >                 "        first_server_hour STRING,\n" +
> >                 "        first_ip_location STRING,\n" +
> >                 "        first_login_time STRING,\n" +
> >                 "        sys_can_uninstall STRING,\n" +
> >                 "        update_date STRING,\n" +
> >                 "        server_time BIGINT,\n" +
> >                 "        last_pass_id STRING,\n" +
> >                 "        last_channel_id STRING,\n" +
> >                 "        last_app_version STRING,\n" +
> >                 "        last_date STRING,\n" +
> >                 "        os STRING,\n" +
> >                 "        attribution_channel_id STRING,\n" +
> >                 "        attribution_first_date STRING,\n" +
> >                 "        p_product STRING,\n" +
> >                 "        p_project STRING,\n" +
> >                 "        p_dt STRING\n" +
> >                 "        >\n" +
> >                 ") WITH (\n" +
> >                 "  'connector.type' = 'hbase',\n" +
> >                 "  'connector.version' = '1.4.3',\n" + // 
> > 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
> >                 "  'connector.table-name' = 
> > 'dw_common_mobile_device_user_mapping_new',\n" +
> >                 "  'connector.zookeeper.quorum' = '"+ zookeeperServers 
> > +"',\n" +
> >                 "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
> >                 "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
> >                 "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
> >                 "  'connector.write.buffer-flush.interval' = '2s'\n" +
> >                 ")";
> >
> > insert into sql:
> >
> > String bodyAndLocalSql = "" +
> > //                "insert into 
> > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
> >                 "SELECT CAST(rowkey AS STRING) AS rowkey, " +
> >                 " ROW(" +
> >                 " device_id, pass_id, first_date, first_channel_id, 
> > first_app_version, first_server_time, first_server_hour, first_ip_location, 
> > first_login_time, sys_can_uninstall, update_date, server_time, 
> > last_pass_id, last_channel_id, last_app_version, last_date, os, 
> > attribution_channel_id, attribution_first_date, p_product, p_project, p_dt 
> > " +
> >                 ") AS f1" +
> >                 " FROM " +
> >                 "(" +
> >                 " SELECT " +
> >                 " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, 
> > kafka.p_project)) AS rowkey, " +
> >                 " kafka.uid AS device_id " +
> >                 ",kafka.pass_id " +
> >
> >                 // first_date
> >                 ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> > kafka.server_time " +
> >                 // 新用户
> >                 " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
> >                 // 老用户
> >                 " ELSE hbase.first_date END AS first_date " +
> >
> >                 // first_channel_id
> >                 ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> > kafka.server_time " +
> >                 // 新用户
> >                 " THEN kafka.wlb_channel_id" +
> >                 // 老用户
> >                 " ELSE hbase.first_channel_id END AS first_channel_id " +
> >
> >                 // first_app_version
> >                 ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> > 

Re: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi chenxyz,

我们遇到的问题应该是一样的,换了1.10.1 后就可以从checkpoint 中恢复了。珞

Best wishes.

chenxyz  于2020年7月15日周三 下午9:53写道:

>
>
>
> Hello,
> Peihui,可以参考下是不是和这个问题类似?之前我在1.10.0也遇到过。
>
> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html#a2239
> 解决方式:
> 1. 使用hdfs作为状态后端不会报错
> 2. 升级至1.10.1使用rocksdb也不会出现该问题
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-14 14:41:53,"Peihui He"  写道:
> >Hi Yun,
> >
> >我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
> >print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
> >里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
> >yarn。
> >
> >Best wishes.
> >
> >Yun Tang  于2020年7月14日周二 上午11:57写道:
> >
> >> Hi Peihui
> >>
> >>
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> >> cause。
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
> >>
> >>
> >> 祝好
> >> 唐云
> >> 
> >> From: Peihui He 
> >> Sent: Tuesday, July 14, 2020 10:42
> >> To: user-zh@flink.apache.org 
> >> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>
> >> hello,
> >>
> >> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >>
> >>
> >> Caused by: java.nio.file.NoSuchFileException:
> >>
> >>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> >> ->
> >>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >>
> >> 配置和1.9.2 一样:
> >> state.backend: rocksdb
> >> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> >> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> >> state.backend.incremental: true
> >>
> >> 代码上都有
> >>
> >> env.enableCheckpointing(1);
> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> >> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >>
> >>
> >>   是1.10.0 需要做什么特别配置么?
> >>
>


flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-16 Thread wangl...@geekplus.com.cn

直接在 flink-conf.yaml 文件中加配置
execution.checkpointing.interval: 6




wangl...@geekplus.com.cn 

 
Sender: Harold.Miao
Send Time: 2020-07-16 13:27
Receiver: user-zh
Subject: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
hi flink users
 
通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢
 
 
 
-- 
 
Best Regards,
Harold Miao


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi  Congxian,

不好意思,本来想准备下例子再回下邮件的,一直拖了这么久。
情况是你说的第2种。
同@chenxyz遇到的情况类似,日志可以参考chenxyz发的
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html


按照chenxyz 的建议换了1.10.1版本后就没有问题了。

Best wishes.


Congxian Qiu  于2020年7月15日周三 下午1:04写道:

> Hi
>
> 我尝试理解一下:
> 1 你用 1.9 跑 wordcount 作业,然后执行了一些 checkpoint,然后停止作业,然后使用 1.10 从之前 1.9 的作业生成的
> checkpoint 恢复,发现恢复不了?
> 2 你用作业 1.10 跑 wordcount,然后遇到特定的 word 会抛异常,然后 failover,发现不能从 checkpoint 恢复?
>
> 你这里的问题是第 1 种还是第 2 种呢?
>
> 另外能否分享一下你的操作步骤以及出错时候的 taskmanager log 呢?
>
> Best,
> Congxian
>
>
> Peihui He  于2020年7月14日周二 下午2:46写道:
>
> > Hi Congxian,
> >
> > 这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word
> > 抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10
> > 都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢?
> >
> > Best wishes.
> >
> > Congxian Qiu  于2020年7月14日周二 下午1:54写道:
> >
> > > Hi
> > >
> > > 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
> > > 另外你可以看下 tm log 看看有没有其他异常
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > Yun Tang  于2020年7月14日周二 上午11:57写道:
> > >
> > > > Hi Peihui
> > > >
> > > >
> > >
> >
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> > > > cause。
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
> > > >
> > > >
> > > > 祝好
> > > > 唐云
> > > > 
> > > > From: Peihui He 
> > > > Sent: Tuesday, July 14, 2020 10:42
> > > > To: user-zh@flink.apache.org 
> > > > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> > > >
> > > > hello,
> > > >
> > > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> > > >
> > > >
> > > > Caused by: java.nio.file.NoSuchFileException:
> > > >
> > > >
> > >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> > > > ->
> > > >
> > >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> > > >
> > > > 配置和1.9.2 一样:
> > > > state.backend: rocksdb
> > > > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> > > > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> > > > state.backend.incremental: true
> > > >
> > > > 代码上都有
> > > >
> > > > env.enableCheckpointing(1);
> > > >
> > > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> > > > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> > > >
> > > >
> > > >   是1.10.0 需要做什么特别配置么?
> > > >
> > >
> >
>


flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn
比如:

CREATE TABLE my_table (
  id BIGINT,
 first_name STRING,
 last_name STRING,
 email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);

最终解析 debezium-json 应该是  
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 
下面的代码
但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

谢谢,
王磊


wangl...@geekplus.com.cn 



Re: Manual allocation of slot usage

2020-07-16 Thread Mu Kong
Hi, Song, Guo,

We updated our cluster to 1.10.1 and the cluster.evenly-spread-out-slots
works pretty well now.
Thanks for your help!

Best regards,
Mu

On Wed, Jul 8, 2020 at 9:35 PM Mu Kong  wrote:

> Hi Song, Guo,
>
> Thanks for the information.
> I will first upgrade our flink cluster to 1.10.0 and try again.
> Currently, we are encountering some dependency conflict issue, possibly
> with tranquility. But that is another issue.
>
> For your information, (also as I described in the previous email)
> *What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos): *we
> are running a standalone cluster with version 1.9.0.
> *How many times have you tried with and without
> `cluster.evenly-spread-out-slots`? *Almost all the time. This is the
> first time we tried it. The behavior before we changed the config, is that
> a great amount of subtasks of the source (11 subtasks) were allocated in
> one task manager, and the rest of the subtasks for that source we were
> spread unevenly to all rest task managers. After changing the
> configuration, the subtasks of this source took all the slots on 4 of our
> task managers, which was more "skewed" than before.
> *How many TMs do you have? And how many slots does each TM has? *We have
> 15 task manager with 15 slots on each.
>
> I will try to reproduce this tomorrow(JST) when I have time.
>
> Best regards,
> Mu
>
> On Wed, Jul 8, 2020 at 11:01 AM Xintong Song 
> wrote:
>
>> Hi Mu,
>> Regarding your questions.
>>
>>- The feature `spread out tasks evenly across task managers` is
>>introduced in Flink 1.10.0, and backported to Flink 1.9.2, per the JIRA
>>ticket [1]. That means if you configure this option in Flink 1.9.0, it
>>should not take any effect.
>>- Please be aware that this feature ATM only works for standalone
>>deployment (including standalone Kubernetes deployment). For the native
>>Kubernetes, Yarn and Mesos deployment, it is a known issue that this
>>feature does not work as expected.
>>- Regarding the scheduling behavior changes, we would need more
>>information to explain this. To provide the information needed, the 
>> easiest
>>way is probably to provide the jobmanager log files, if you're okay with
>>sharing them. If you cannot share the logs, then it would be better to
>>answer the following questions
>>   - What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos)
>>   - How many times have you tried with and without
>>   `cluster.evenly-spread-out-slots`? In other words, the described 
>> behaviors
>>   before and after setting `cluster.evenly-spread-out-slots`, can they be
>>   stably reproduced?
>>   - How many TMs do you have? And how many slots does each TM has?
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12122
>>
>> On Tue, Jul 7, 2020 at 8:33 PM Mu Kong  wrote:
>>
>>> Hi, Guo,
>>>
>>> Thanks for helping out.
>>>
>>> My application has a kafka source with 60 subtasks(parallelism), and we
>>> have 15 task managers with 15 slots on each.
>>>
>>> *Before I applied the cluster.evenly-spread-out-slots,* meaning it is
>>> set to default false, the operator 'kafka source" has 11 subtasks allocated
>>> in one single task manager,
>>> while the remaining 49 subtasks of "kafka source" distributed to the
>>> remaining 14 task managers.
>>>
>>> *After I set cluster.evenly-spread-out-slots to true*, the 60 subtasks
>>> of "kafka source" were allocated to only 4 task managers, and they took 15
>>> slots on each of these 4 TMs.
>>>
>>> What I thought is that this config will make the subtasks of one
>>> operator more evenly spread among the task managers, but it seems it made
>>> them allocated in the same task manager as much as possible.
>>>
>>> The version I'm deploying is 1.9.0.
>>>
>>> Best regards,
>>> Mu
>>>
>>> On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo  wrote:
>>>
 Hi, Mu,

 IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
 do you think it does the opposite of what you want. Do you run your
 job in active mode? If so, cluster.evenly-spread-out-slots might not
 work very well because there could be insufficient task managers when
 request slot from ResourceManager. This has been discussed in
 https://issues.apache.org/jira/browse/FLINK-12122 .


 Best,
 Yangze Guo

 On Tue, Jul 7, 2020 at 5:44 PM Mu Kong  wrote:
 >
 > Hi community,
 >
 > I'm running an application to consume data from kafka, and process it
 then put data to the druid.
 > I wonder if there is a way where I can allocate the data source
 consuming process evenly across the task manager to maximize the usage of
 the network of task managers.
 >
 > So, for example, I have 15 task managers and I set parallelism for
 the kafka source as 60, since I have 60 partitions in kafka topic.
 > What I want is flink cluster will put 4 kafka 

Flink yarn session exception

2020-07-16 Thread Rainie Li
大佬们好,我是flink新手,正在用flink 1.9.1
Flink APP cannot run, APP log error,  想求教一下会是什么原因造成的,多谢

2020-06-16 17:06:21,921 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
2020-06-16 17:06:21,980 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not available.


Accumulators in Table API

2020-07-16 Thread Flavio Pompermaier
Hi to all,
in my legacy code (using Dataset api) I used to add a map function just
after the Source read and keep the count of the rows. In this way I had a
very light and unobtrusive way of counting the rows of a dataset. Can I do
something similar in table API? Is there a way to use accumulators?

Thanks in advance,
Flavio


Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread Zhou Zach
-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了

















在 2020-07-16 15:03:14,"flinkcx"  写道:
>是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4
>
>
> 原始邮件 
>发件人: Zhou Zach
>收件人: Flink user-zh mailing list
>发送时间: 2020年7月16日(周四) 14:51
>主题: flink1.11 set yarn slots failed
>
>
>Hi all, 使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink 
>run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ 
>-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
>发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
> of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
>Slots的值,现在每次提交作业都是0


Re: Print table content in Flink 1.11

2020-07-16 Thread Flavio Pompermaier
Thanks for the suggestions Kurt and Jingsong! Very helpful

On Thu, Jul 16, 2020 at 4:30 AM Jingsong Li  wrote:

> Hi Flavio,
>
> For print:
> - As Kurt said, you can use `table.execute().print();`, records will be
> collected to the client (NOTE it is client) and print to client console.
> - But if you want print records in runtime tasks like DataStream.print,
> you can use [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/print.html
>
> Best,
> Jingsong
>
> On Thu, Jul 16, 2020 at 10:18 AM Kurt Young  wrote:
>
>> Hi Flavio,
>>
>> In 1.11 we have provided an easier way to print table content, after you
>> got the `table` object,
>> all you need to to is calling `table.execute().print();`
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu  wrote:
>>
>>> Hi, Flavio
>>>
>>>
>>> 在 2020年7月16日,00:19,Flavio Pompermaier  写道:
>>>
>>> final JobExecutionResult jobRes = tableEnv.execute("test-job");
>>>
>>>
>>> In Flink 1.11, once a Table has transformed to DataStream, only
>>> StreamExecutionEnvironment can execute the DataStream program, please use
>>> env.execute(“test-job”) in this case, you can get mote information from [1].
>>>
>>>
>>> Best,
>>> Leonard Xu
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>>
>>>
>
> --
> Best, Jingsong Lee
>


Re: Using md5 hash while sinking files to s3

2020-07-16 Thread Chesnay Schepler

Please try configuring :

fs.s3a.etag.checksum.enabled: true


On 16/07/2020 03:11, nikita Balakrishnan wrote:

Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
S3 Extended Request ID: ), S3 Extended Request ID: xx
:InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
Code: InvalidRequest; Request ID: ; S3 Extended Request ID: )
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
at

Re: flink1.11 set yarn slots failed

2020-07-16 Thread Yang Wang
-t是新引入的参数,是不支持以前的-yxxx参数的
你需要使用-Dtaskmanager.numberOfTaskSlots=4这样来设置

Zhou Zach  于2020年7月16日周四 下午2:51写道:

> Hi all,
>
>
> 使用如下命令,设置Number of slots per TaskManager
> /opt/flink-1.11.0/bin/flink run-application -t yarn-application \
> -Djobmanager.memory.process.size=1024m \
> -Dtaskmanager.memory.process.size=2048m \
>  -ys 4 \
>
>
> 发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
> of slots per TaskManager?
> 另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0


回复:flink1.11 set yarn slots failed

2020-07-16 Thread flinkcx
是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4


 原始邮件 
发件人: Zhou Zach
收件人: Flink user-zh mailing list
发送时间: 2020年7月16日(周四) 14:51
主题: flink1.11 set yarn slots failed


Hi all, 使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink 
run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0

flink1.11 set yarn slots failed

2020-07-16 Thread Zhou Zach
Hi all,


使用如下命令,设置Number of slots per TaskManager
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
 -ys 4 \


发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager?
另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0

Re: 【求助】Flink Hadoop依赖问题

2020-07-16 Thread Yang Wang
你可以在Pod里面确认一下/data目录是否正常挂载,另外需要在Pod里ps看一下
起的JVM进程里的classpath是什么,有没有包括hadoop的jar


当然,使用Roc Marshal建议的增加flink-shaded-hadoop并且放到$FLINK_HOME/lib下也可以解决问题

Best,
Yang

Roc Marshal  于2020年7月15日周三 下午5:09写道:

>
>
>
> 你好,Z-Z,
>
> 可以尝试在
> https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/
> 下载对应的uber jar包,并就将下载后的jar文件放到flink镜像的 ${FLINK_HOME}/lib 路径下,之后启动编排的容器。
> 祝好。
> Roc Marshal.
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-15 10:47:39,"Z-Z"  写道:
> >我在使用Flink 1.11.0版本中,使用docker-compose搭建,docker-compose文件如下:
> >version: "2.1"
> >services:
> > jobmanager:
> >  image: flink:1.11.0-scala_2.12
> >  expose:
> >   - "6123"
> >  ports:
> >   - "8081:8081"
> >  command: jobmanager
> >  environment:
> >   - JOB_MANAGER_RPC_ADDRESS=jobmanager
> >   -
> HADOOP_CLASSPATH=/data/hadoop-2.9.2/etc/hadoop:/data/hadoop-2.9.2/share/hadoop/common/lib/*:/data/hadoop-2.9.2/share/hadoop/common/*:/data/hadoop-2.9.2/share/hadoop/hdfs:/data/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/data/hadoop-2.9.2/share/hadoop/hdfs/*:/data/hadoop-2.9.2/share/hadoop/yarn:/data/hadoop-2.9.2/share/hadoop/yarn/lib/*:/data/hadoop-2.9.2/share/hadoop/yarn/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/data/hadoop-2.9.2/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
> >  volumes:
> >   - ./jobmanager/conf:/opt/flink/conf
> >   - ./data:/data
> >
> >
> > taskmanager:
> >  image: flink:1.11.0-scala_2.12
> >  expose:
> >   - "6121"
> >   - "6122"
> >  depends_on:
> >   - jobmanager
> >  command: taskmanager
> >  links:
> >   - "jobmanager:jobmanager"
> >  environment:
> >   - JOB_MANAGER_RPC_ADDRESS=jobmanager
> >  volumes:
> >   - ./taskmanager/conf:/opt/flink/conf
> >networks:
> > default:
> >  external:
> >   name: flink-network
> >
> >
> >
> >hadoop-2.9.2已经放在data目录了,且已经在jobmanager和taskmanager的环境变量里添加了HADOOP_CLASSPATH,但通过cli提交和webui提交,jobmanager还是提示报Could
> not find a file system implementation for scheme 'hdfs'。有谁知道是怎么回事吗?
>


?????? ??????????: flink state

2020-07-16 Thread Robert.Zhang
??thanku all



----
??: "Congxian Qiu"https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
 [2] https://cloud.tencent.com/developer/article/1509789
 Best,
 Congxian


 Robert.Zhang <173603...@qq.comgt; ??2020??7??13?? 9:50??

 gt; Hello,all
 gt; stream
 gt; state keyed streamglobal
 gt; 
parameter??statestream
 gt; operator??
 gt;
 gt;
 gt; Best regards


Re: ElasticSearch_Sink

2020-07-16 Thread Yun Gao
Hi Dinesh,

   As far as I know, to implement the 2 phase commit protocol for one external 
system, I think the external system is required to provide some kind of 
transactions that could stay across sessions. With such a transaction mechansim 
then we could first start a new transaction and write the data, then we 
precommit the transaction when checkpointing and commit the transaction when on 
checkpoint complete notificatoin. After failover, we could be able to recover 
the transaction and abort them (if not precommitted) or commit them (if 
precommitted) again. As an example, for JDBC we may have to use XA transaction 
instead of normal JDBC transaction, since JDBC transaction will always be 
aborted when failover, even if we have precommitted.

  If such a transaction mechanism is not provided by the external system, we 
may have to use a secondary system (Like WAL logs or JDBC Table) to first cache 
the data and only write the data to the final system on commit. Note that since 
a transaction might be committed multiple times, the final system could still 
need to deduplicate the records or have some kind of transaction mechansim 
always aborted on failover.

Best,
Yun

--
Sender:C DINESH
Date:2020/07/16 11:01:02
Recipient:user
Theme:ElasticSearch_Sink

Hello All,

Can we implement 2 Phase Commit Protocol for elastic search sink. Will there be 
any limitations?

Thanks in advance.

Warm regards,
Dinesh. 


  1   2   >