Re: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 文章 Rui Li
streaming file committer在提交分区之前会打印这样的日志:

LOG.info("Partition {} of table {} is ready to be committed",
partSpec, tableIdentifier);

partition commit policy会在成功提交分区以后打印这样的日志:

LOG.info("Committed partition {} to metastore", partitionSpec);

LOG.info("Committed partition {} with success file", context.partitionSpec());

可以检查一下这样的日志,看是不是卡在什么地方了

On Tue, Sep 8, 2020 at 11:02 AM 夏帅  wrote:

> 就第二次提供的日志看,好像是你的namenode出现的问题
>
>
> --
> 发件人:MuChen <9329...@qq.com>
> 发送时间:2020年9月8日(星期二) 10:56
> 收件人:user-zh@flink.apache.org 夏帅 ; user-zh <
> user-zh@flink.apache.org>
> 主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败
>
> 在checkpoint失败的时间,tm上还有一些info和warn级别的日志:
> 2020-09-04 17:17:59,520 INFO
> org.apache.hadoop.io.retry.RetryInvocationHandler [] - Exception while
> invoking create of class ClientNamenodeProtocolTranslatorPB over
> uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts.
> Trying to fail over immediately.
> java.io.IOException: java.lang.InterruptedException
> at org.apache.hadoop.ipc.Client.call(Client.java:1449)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.hadoop.ipc.Client.call(Client.java:1401)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_144]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144]
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at com.sun.proxy.$Proxy27.create(Unknown Source) ~[?:?]
> at
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
> ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
> ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.filesystem.SuccessFileCommitPolicy.commit(SuccessFileCommitPolicy.java:45)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.filesystem.stream.StreamingFileCommitter.commitPartitions(StreamingFileCommitter.java:167)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.table.filesystem.stream.StreamingFileCommitter.processElement(StreamingFileCommitter.java:144)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
> at 
> 

回复:消费kafka数据乱序问题

2020-09-07 文章 Shuai Xia
可以通过设置Key的方式保证同一个账户的数据是有序的


--
发件人:Danny Chan 
发送时间:2020年9月8日(星期二) 11:12
收件人:user-zh 
主 题:Re: 消费kafka数据乱序问题

你的 source 消费单/多 partition 数据相对 partition 来说仍然是有序的 只是 source 和下游 operator 如果存在数据 
shuffle 就会破坏顺序,目前想保序,一种办法是 source 的并发和下游保持一致。

Best,
Danny Chan
在 2020年9月4日 +0800 PM4:40,smq <374060...@qq.com>,写道:
> 大家好
>  
> 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额.
> 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。
>  这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.


Re: 消费kafka数据乱序问题

2020-09-07 文章 Danny Chan
你的 source 消费单/多 partition 数据相对 partition 来说仍然是有序的 只是 source 和下游 operator 如果存在数据 
shuffle 就会破坏顺序,目前想保序,一种办法是 source 的并发和下游保持一致。

Best,
Danny Chan
在 2020年9月4日 +0800 PM4:40,smq <374060...@qq.com>,写道:
> 大家好
>  
> 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额.
> 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。
>  这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.


回复:回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 文章 夏帅
就第二次提供的日志看,好像是你的namenode出现的问题


--
发件人:MuChen <9329...@qq.com>
发送时间:2020年9月8日(星期二) 10:56
收件人:user-zh@flink.apache.org 夏帅 ; user-zh 

主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

在checkpoint失败的时间,tm上还有一些info和warn级别的日志:
2020-09-04 17:17:59,520 INFO org.apache.hadoop.io.retry.RetryInvocationHandler 
[] - Exception while invoking create of class 
ClientNamenodeProtocolTranslatorPB over 
uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. Trying to 
fail over immediately.
java.io.IOException: java.lang.InterruptedException
at org.apache.hadoop.ipc.Client.call(Client.java:1449) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1401) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_144]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy27.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.SuccessFileCommitPolicy.commit(SuccessFileCommitPolicy.java:45)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.commitPartitions(StreamingFileCommitter.java:167)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.processElement(StreamingFileCommitter.java:144)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 

?????? ??????????StreamingFileSink??hive metadata??????????????????

2020-09-07 文章 MuChen
??checkpointtm??info??warn
2020-09-04 17:17:59,520 INFO  org.apache.hadoop.io.retry.RetryInvocationHandler 
   [] - Exception while invoking create of class 
ClientNamenodeProtocolTranslatorPB over 
uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. Trying to 
fail over immediately.
java.io.IOException: java.lang.InterruptedException
at org.apache.hadoop.ipc.Client.call(Client.java:1449) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1401) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown 
Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_144]
at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_144]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy27.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.SuccessFileCommitPolicy.commit(SuccessFileCommitPolicy.java:45)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.commitPartitions(StreamingFileCommitter.java:167)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.processElement(StreamingFileCommitter.java:144)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 [music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 

Re: 使用k8s 实现flink 的高可用

2020-09-07 文章 Yang Wang
目前Flink on K8s(包括Standalone on K8s和native
K8s)支持的HA方式只有ZK+HDFS/S3/GFS/OSS的方式,配置就是和Standalone是一致的[1]
因为JobManager使用了deployment,所以失败以后会自动被再次拉起,然后从ZK和外部存储进行恢复

主要是以下这些配置:

high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 10 s


不过基于PV+FileSystemHAService[2]以及HDFS/S3/GFS/OSS
+ NativeK8sHAService[3]这两种方案都还在设计和讨论中

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
[2]. https://issues.apache.org/jira/browse/FLINK-17598
[3]. https://issues.apache.org/jira/browse/FLINK-12884

Best,
Yang

旧城以西  于2020年9月7日周一 下午5:48写道:

> 请教各位大佬,如果通过k8s session 实现flink的ha,如果有实现的了,请提供一下相应配置文件
> 根据官方文档,配置ha需要修改master,那么通过k8s 部署,如何修改master文件


Re: how flink-sql connector kafka reads array json

2020-09-07 文章 Benchao Li
Hi,

这个是一个已知的问题,已经有issue[1] 在跟进解决了。预计在1.12可以使用。

[1] https://issues.apache.org/jira/browse/FLINK-18590

大罗  于2020年9月8日周二 上午10:39写道:

> hi,大家好,我遇到一个问题。
>
> 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
> "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?
>
> CREATE TABLE mykafka1 (name String, age Int)
> WITH (
>'connector.type' = 'kafka',
>'format.type' = 'json',
>'update-mode' = 'append'
> );
>
>
> 还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream>,再转换flatMap转换成DataStream,再使用tableEnv.fromDataStream把它变成tableSource?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 文章 夏帅
异常日志只有这些么?有没有详细点的

Re: flink-1.11 native k8s state 在 ceph 存储问题

2020-09-07 文章 Yang Wang
其实需要的其实是PV+FileSystemHAService这样的HA方式[1],但目前没有支持,并且如何来做还在讨论中。之前社区也有人有类似的需求,你也可以看下[1]
目前支持的是ZK+HDFS/S3/GFS/OSS等这样的HA方式,
同时在考虑K8s ConfigMap + HDFS/S3/GFS/OSS的方式[2]

[1].
https://lists.apache.org/thread.html/r36882b6c8286132b6fe6851e1c04cd876e9fa35aff9a5b22d181487d%40%3Cuser.flink.apache.org%3E
[2]. https://issues.apache.org/jira/browse/FLINK-17598
[3]. https://issues.apache.org/jira/browse/FLINK-12884


Best,
Yang

chenkaibit  于2020年9月7日周一 下午7:42写道:

> HI:
> 想问下大家在 native k8s 模式下有没有用到 ceph 存储 flink state 和 HA 数据(替代原来的 HDFS).
> 我在测试中发现 flink on native k8s 下没有办法挂载 cephfs,这一块是不是需要修改代码才能支持?我在 flink jira
> 上发现一个相关 issue(https://issues.apache.org/jira/browse/FLINK-15649),在这里想请教下
> flink-1.11 版本如何挂载 cephfs;如果不能挂载 cephfs 的话,还能使用除 hdfs 之外的哪些存储。
> 求大神解惑。
>
> --
>
> Best, yuchuan


how flink-sql connector kafka reads array json

2020-09-07 文章 大罗
hi,大家好,我遇到一个问题。

下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
"age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?

CREATE TABLE mykafka1 (name String, age Int) 
WITH (
   'connector.type' = 'kafka',
   'format.type' = 'json',
   'update-mode' = 'append'
);

还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream>,再转换flatMap转换成DataStream,再使用tableEnv.fromDataStream把它变成tableSource?




--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? ????StreamingFileSink??hive metadata??????????????????

2020-09-07 文章 MuChen
??




----
??: 
   "MuChen" 
   <9329...@qq.com;
:2020??9??7??(??) 11:01
??:"user-zh"https://s1.ax1x.com/2020/09/07/wn1CFg.png


checkpoint??
2020-09-04 17:17:59
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
 at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)





--  --
??: 
   "user-zh"



?????? 1.11????????????????????????????????????????????????????????????????

2020-09-07 文章 Asahi Lee
??
   
StreamTableEnvironment.from("")??datagen??table??


??
package org.apache.flink.playgrounds.spendreport;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


import java.util.Arrays;


public class Test2 {


  public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);


String sourceDDL = "CREATE TABLE datagen ( " +
" f_random INT," +
" f_random_str STRING " 
+
" ) WITH ( " +
" 'connector' = 
'datagen', " +
" 
'rows-per-second'='10'," +
" 
'fields.f_random.min'='1', " +
" 
'fields.f_random.max'='10', " +
" 
'fields.f_random_str.length'='10' " +
" ) ";
bsTableEnv.executeSql(sourceDDL);
for (int i = 0; i < 10; i++) {
  Table datagen = 
bsTableEnv.from("datagen");
  System.out.println(datagen);
}
System.out.println("-" + 
Arrays.toString(bsTableEnv.listTables()));


  }


}



UnnamedTable$0
UnnamedTable$1
UnnamedTable$2
UnnamedTable$3
UnnamedTable$4
UnnamedTable$5
UnnamedTable$6
UnnamedTable$7
UnnamedTable$8
UnnamedTable$9
-[UnnamedTable$0, UnnamedTable$1, UnnamedTable$2, 
UnnamedTable$3, UnnamedTable$4, UnnamedTable$5, UnnamedTable$6, UnnamedTable$7, 
UnnamedTable$8, UnnamedTable$9, datagen]

flink-1.11 native k8s state 在 ceph 存储问题

2020-09-07 文章 chenkaibit
HI:
想问下大家在 native k8s 模式下有没有用到 ceph 存储 flink state 和 HA 数据(替代原来的 HDFS). 我在测试中发现 
flink on native k8s 下没有办法挂载 cephfs,这一块是不是需要修改代码才能支持?我在 flink jira 上发现一个相关 
issue(https://issues.apache.org/jira/browse/FLINK-15649),在这里想请教下 flink-1.11 
版本如何挂载 cephfs;如果不能挂载 cephfs 的话,还能使用除 hdfs 之外的哪些存储。
求大神解惑。

--

Best, yuchuan

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-07 文章 Peihui He
Hi,

就是用hdfs的。

Jingsong Li  于2020年9月7日周一 上午11:16写道:

> 另外,可能和使用本地文件系统有关?换成HDFS试试?
>
> On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li 
> wrote:
>
> > Hi,
> >
> > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里?
> >
> > On Sat, Sep 5, 2020 at 11:11 PM Peihui He  wrote:
> >
> >> Hi, all
> >>
> >> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。
> >> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。
> >>
> >> 请问有什么好的解决方式没呢?
> >>
> >> Best Wishes.
> >>
> >> Peihui He  于2020年9月4日周五 下午6:25写道:
> >>
> >>> Hi, all
> >>>
> >>> 当指定partition的时候这个问题通过path 也没法解决了
> >>>
> >>> CREATE TABLE MyUserTable (
> >>>   column_name1 INT,
> >>>   column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
> >>>   'connector' = 'filesystem',   -- required: specify the
> connector
> >>>   'path' = 'file:///path/to/whatever',  -- required: path to a
> directory
> >>>   'format' = 'json', -- required: file system
> connector)
> >>>
> >>>
> >>> select  * from  MyUserTable  limit 10;
> >>>
> >>> job 会一直卡在一个地方
> >>> [image: image.png]
> >>>
> >>> 这种改怎么解决呢?
> >>>
> >>> Peihui He  于2020年9月4日周五 下午6:02写道:
> >>>
>  hi, all
>  我这边用flink sql client 创建表的时候
> 
>  CREATE TABLE MyUserTable (
>    column_name1 INT,
>    column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>    'connector' = 'filesystem',   -- required: specify the
> connector
>    'path' = 'file:///path/to/whatever',  -- required: path to a
> directory
>    'format' = 'json', -- required: file system
> connector)
> 
>  当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
>  sql client 提交job会很慢,最后会报错
> 
>  Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>  [Internal server error.,   org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job
> has
>  already been submitted. at
> 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
>  at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
> 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at
> 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>  at
> 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>  at
> 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>  at
> 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
>  akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
>  scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
>  akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at
>  scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
>  scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>  scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
>  akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
>  akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
>  akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
>  akka.actor.ActorCell.invoke(ActorCell.scala:561) at
>  akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
>  akka.dispatch.Mailbox.run(Mailbox.scala:225) at
>  akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
>  akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  End of exception on server side>] at
> 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
>  at
> 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
>  at
> 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>  at
> 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> 
> 
>  flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
> 
>  这种情况不知道有没有遇到过?
> 
>  Best Wishes.
> 
> 
> 
> >>>
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-07 文章 Peihui He
Hi,

从jstack 分析,因该是卡在下面这里了。看代码好像是需要遍历所有hdfs上指定path的文件。是这样的不?如果文件很多的话不是要很慢?


"flink-akka.actor.default-dispatcher-30" #103 prio=5 os_prio=0
tid=0x7f6264001000 nid=0x4a93 in Object.wait() [0x7f62964f1000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1533)
- locked <0xebd49e50> (a org.apache.hadoop.ipc.Client$Call)
at org.apache.hadoop.ipc.Client.call(Client.java:1491)
at org.apache.hadoop.ipc.Client.call(Client.java:1388)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
at com.sun.proxy.$Proxy45.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
- locked <0xebd49d40> (a
org.apache.hadoop.io.retry.RetryInvocationHandler$Call)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy46.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853)
at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:910)
at
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:267)
at
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:264)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:274)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:248)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileBlockLocations(HadoopFileSystem.java:98)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:652)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:258)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
at
org.apache.flink.runtime.dispatcher.Dispatcher$$Lambda$241/1691741073.get(Unknown
Source)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at

Re: Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 文章 Xingbo Huang
Hi,

你这个图挂了。json, csv这种是format[1] 。filesystem,datagen, print,
kafka等这种都是connector[2] ,用来从外部一个数据源读入数据或者写出数据。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
Best,
Xingbo

whh_960101  于2020年9月7日周一 下午5:14写道:

> 您好,
>
> 图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么
>
>
>
>
>
>
>
> 在 2020-09-07 11:33:06,"Xingbo Huang"  写道:
> >Hi,
> >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
> >
> >你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
> >[2] 来读取一个dataframe。
> >
> >[1]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >[2]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
> >
> >Best,
> >Xingbo
> >
> >whh_960101  于2020年9月7日周一 上午11:22写道:
> >
> >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
> >> dic = {1:'a',2:'b'}
> >> 此时定义udf如下:
> >>
> >> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
> >> def func(dic,f):
> >>..
> >>return L
> >> st_env.register_function("func", func)
> >> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
> >> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
> >> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 16:02:56,"Xingbo Huang"  写道:
> >> >Hi,
> >> >
> >> >推荐你使用ddl来声明你上下游用的connector
> >> >
> >> >```
> >> >table_env.execute_sql("""
> >> >CREATE TABLE output (
> >> >data STRING ARRAY
> >> >) WITH (
> >> > 'connector' = 'filesystem',   -- required: specify the connector
> >> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> >> > 'format' = 'json',
> >> > 'json.fail-on-missing-field' = 'false',
> >> > 'json.ignore-parse-errors' = 'true'
> >> >)
> >> >""")
> >> >
> >>
> >> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
> >> >```
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >
> >> >
> >> >whh_960101  于2020年9月4日周五 下午3:46写道:
> >> >
> >> >> 您好,我是想让输出insert_into到目标表中,具体如下:
> >> >> st_env=StreamExecutionEnvironment.get_execution_environment()
> >> >> st_env.connect了一个source table(table包含a字段),
> >> >> 然后
> >> >> | st_env.connect(FileSystem().path('tmp')) \ |
> >> >> | | .with_format(OldCsv() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> >> >> | | .with_schema(Schema() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> >> >> | | .create_temporary_table('sink') |
> >> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> >> >> 然后我定义了一个udf
> >> >>
> >> >>
> >> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> def func(a):
> >> >> rec_list = a.split(',')
> >> >> res_arr = np.arrary(rec_list,dtype=str)
> >> >> return res_arr
> >> >> st_env.register_function("func", func)
> >> >> st_env.from_path("source").select("func(a)").insert_into("sink")
> >> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
> >> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> >> >> res_arr[0],tmp文件里面的字符串就是正确。
> >> >> 我想要得到array,该怎么解决?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
> >> >> >Hi,
> >> >> >
> >> >> >你是调试的时候想看结果吗?
> >> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >> >> >
> >> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >> >> >
> >> >> >```
> >> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >> >> >
> >> >> >@udf(input_types=DataTypes.STRING(),
> >> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> >def func(a):
> >> >> > return np.array([a, a, a], dtype=str)
> >> >> >
> >> >> >table_env.register_function("func", func)
> >> >> >
> >> >> >table.select("func(b)").to_pandas()
> >> >> >```
> >> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> >[1]
> >> >> >
> >> >>
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >> >> >
> >> >> >whh_960101  于2020年9月4日周五 下午2:50写道:
> >> >> >
> >> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> >> >> 我的udf输出了一个numpy.array(dtype = str),
> >> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >> >> >>
> >> >> >>
> >> >>
> >> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> >> >> 请问这个问题该怎么解决?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>

Re: Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 文章 Xingbo Huang
Hi,


你那个嵌套json串没法用Map来玩(要求所有key是一个类型,value是一个类型),我觉得你得用Row来表示,类型声明类似于Row(String,
Map(String,
String))这种(根据你给的那个{'table':'a','data':{'data1':'b','data2':'c'}这样的数据来的),
你可以认为Row是我们提供的继承自tuple的,所以你的这个json串还得你自己转成Row,方式也简单。
比如你的data 是上面说的{'table':'a','data':{'data1':'b','data2':'c'}
Row('table'=data['table'], 'data'=data['data'])


Best,
Xingbo

whh_960101  于2020年9月7日周一 下午3:11写道:

> 您好,完全叙述一下我的问题:
> 1.首先我需要将一个定义好的python字典作为udf的输入参数,假设这个字典为dic = {1:'a',2:'b'}
>
> 那么我在定义udf的时候,如何写输入(一共两个输入参数,一是这个定义好的字典dic,二是一个DataTypes.ARRAY(DataTypes.STRING()),即下文的re_list)
> 我的方法是:
> class fun(ScalarFunction):
> def __int__(self):
> self.dic = {1:'a',2:'b'}
> def eval(self,re_list):
> #调用dic时,使用self.dic
> #..
> return res
> #返回的输出也是一个python的字典,实际是想输出一个两层嵌套的json字典,{'table':'a','data':{'data1':'b','data2':'c'}
>
>
>
>
>
> st_env.register_function("fun",udf(fun(),DataTypes.ARRAY(DataTypes.STRING()),DataTypes.MAP(DataTypes.STRING(),DataTypes.STRING(
> #这样写是否正确
> st_env.from_path("source").select("fun(t)").execute_insert("sink")
>
>
> 2.我在定义sink表采用
> st_env.execute_sql("""
> CREATE TABLE sink(
> table STRING,
> data STRING
> )WITH(
> 'connector' = 'filesystem',
> 'path' = 'home/res/',
> 'format' = 'csv') #format如果是json就报错ParseException:Encountered"table"at
> line 1,column 43 was expecting one of "CONSTRAINT" ... "PRIMARY"
> """) #我想要在问题1中打印出来输出的json字典,这样的ddl的定义方式是否正确
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-07 11:33:06,"Xingbo Huang"  写道:
> >Hi,
>
> >你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
> >
>
> >你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
> >[2] 来读取一个dataframe。
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >[2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
> >
> >Best,
> >Xingbo
> >
> >whh_960101  于2020年9月7日周一 上午11:22写道:
> >
> >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
> >> dic = {1:'a',2:'b'}
> >> 此时定义udf如下:
> >>
> >>
> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
> >> def func(dic,f):
> >>..
> >>return L
> >> st_env.register_function("func", func)
> >> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
> >> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
> >> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 16:02:56,"Xingbo Huang"  写道:
> >> >Hi,
> >> >
> >> >推荐你使用ddl来声明你上下游用的connector
> >> >
> >> >```
> >> >table_env.execute_sql("""
> >> >CREATE TABLE output (
> >> >data STRING ARRAY
> >> >) WITH (
> >> > 'connector' = 'filesystem',   -- required: specify the
> connector
> >> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
> >> > 'format' = 'json',
> >> > 'json.fail-on-missing-field' = 'false',
> >> > 'json.ignore-parse-errors' = 'true'
> >> >)
> >> >""")
> >> >
> >>
> >>
> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
> >> >```
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >
> >> >
> >> >whh_960101  于2020年9月4日周五 下午3:46写道:
> >> >
> >> >> 您好,我是想让输出insert_into到目标表中,具体如下:
> >> >> st_env=StreamExecutionEnvironment.get_execution_environment()
> >> >> st_env.connect了一个source table(table包含a字段),
> >> >> 然后
> >> >> | st_env.connect(FileSystem().path('tmp')) \ |
> >> >> | | .with_format(OldCsv() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> >> >> | | .with_schema(Schema() |
> >> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> >> >> | | .create_temporary_table('sink') |
> >> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> >> >> 然后我定义了一个udf
> >> >>
> >> >>
> >>
> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> def func(a):
> >> >> rec_list = a.split(',')
> >> >> res_arr = np.arrary(rec_list,dtype=str)
> >> >> return res_arr
> >> >> st_env.register_function("func", func)
> >> >> st_env.from_path("source").select("func(a)").insert_into("sink")
> >> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
> >> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> >> >> res_arr[0],tmp文件里面的字符串就是正确。
> >> >> 我想要得到array,该怎么解决?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
> >> >> >Hi,
> >> >> >
> >> >> >你是调试的时候想看结果吗?
> >> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >> >> >
> >> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >> >> >
> >> >> >```
> >> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >> >> >
> >> >> >@udf(input_types=DataTypes.STRING(),
> >> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >> >> >def func(a):
> >> >> > return np.array([a, a, a], 

Re: 1.11版本,创建视图后,根据表名称获取视图表对象,表名为临时名称的问题

2020-09-07 文章 Leonard Xu



>Table table1 = bsTableEnv.from("table1");
>System.out.println(table1);
>// 上面的打印,我预期的表名应该为table1,但是为一个临时表名
Table 对象里的 tableName 没有太大的意义,看了下代码从来没有被赋值过。

我理解 Table 都是从QueryOperation 转化来的,而queryOperation对应的是一个 query (select id, total, 
12 as col_1 from datagen),本身是没有表名的。createTemporaryView("table1", table); 只是 将 
table所对应 queryOperation 对应到了 表名的path(table1)下,  所以 table1 对象拿到的还是同一个query 
operation。

如果想看创建的临时表,可以用bsTableEnv.listTables()查看。

祝好
Leonard Xu

Re: RocksDBStateBackend 问题

2020-09-07 文章 刘建刚
直接存在rocksdb数据库。rocksdb会首先将数据写到内存buffer中(不会太大),等buffer满了再刷到磁盘。相比filesystem的statebackend,rocksdb会因为序列化和反序列化导致处理速度慢一些,但是优势是可以利用磁盘的超大空间来存储更大的状态。

zilong xiao  于2020年9月7日周一 下午5:51写道:

> 可以看下这个文档:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>
> guaishushu1...@163.com  于2020年9月7日周一 下午5:47写道:
>
> > 想问下关于RocksDBStateBackend
> > 是直接把状态存在rocksdb数据库,还是等内存满了再存到RocksDB数据库。如果直接存在RocksDB数据库,那岂不是很影响数据处理速度。
> >
> >
> >
> > guaishushu1...@163.com
> >
>


Re: RocksDBStateBackend 问题

2020-09-07 文章 zilong xiao
可以看下这个文档:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend

guaishushu1...@163.com  于2020年9月7日周一 下午5:47写道:

> 想问下关于RocksDBStateBackend
> 是直接把状态存在rocksdb数据库,还是等内存满了再存到RocksDB数据库。如果直接存在RocksDB数据库,那岂不是很影响数据处理速度。
>
>
>
> guaishushu1...@163.com
>


使用k8s 实现flink 的高可用

2020-09-07 文章 旧城以西
请教各位大佬,如果通过k8s session 实现flink的ha,如果有实现的了,请提供一下相应配置文件
根据官方文档,配置ha需要修改master,那么通过k8s 部署,如何修改master文件

RocksDBStateBackend 问题

2020-09-07 文章 guaishushu1...@163.com
想问下关于RocksDBStateBackend  
是直接把状态存在rocksdb数据库,还是等内存满了再存到RocksDB数据库。如果直接存在RocksDB数据库,那岂不是很影响数据处理速度。



guaishushu1...@163.com


Re: flink-sql1.11写入mysql重复问题

2020-09-07 文章 Leonard Xu
Hi

这个原因在于flink 和 mysql主键是不一致导致的重复数据,你可以把当前mysql表中设置成flink主键的字段添加个unique key 
约束,这样和pk也是等价的。

Best
Leonard

> 在 2020年9月6日,21:21,凌天荣 <466792...@qq.com> 写道:
> 
> 使用flink-sql1.11时,insert进connect为mysql的flink 
> table时,由于mysql里面的id是主键自增的,1.11版本upsert模式设置主键在插入的时候又不能指定为mysql的主键id,只能设置别的字段为flink
>  table的主键,所以会造成有时候mysql里面有两条相同的数据,有啥可解决的吗?(PS:两条是ods两个事实表进行intvate 
> join,正常情况flink table主键和mysql主键一样的话,upsert模式是能回撤旧数据写入新数据的)



flink-CDC 中文乱码问题

2020-09-07 文章 Li,Qian(DXM,PB)
你好:
我在使用Flink CDC SQL CLI向ES6写数据的时候,
由于原数据库编码形式是latin1,这样往ES中写入的中文数据就是乱码了,
请问有什么解决方式么,可以进行编码转换?
谢谢!

李倩



Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 文章 whh_960101
您好,
图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么

















在 2020-09-07 11:33:06,"Xingbo Huang"  写道:
>Hi,
>你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
>
>你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
>[2] 来读取一个dataframe。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
>
>Best,
>Xingbo
>
>whh_960101  于2020年9月7日周一 上午11:22写道:
>
>> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
>> dic = {1:'a',2:'b'}
>> 此时定义udf如下:
>>
>> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
>> def func(dic,f):
>>..
>>return L
>> st_env.register_function("func", func)
>> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
>> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
>> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 16:02:56,"Xingbo Huang"  写道:
>> >Hi,
>> >
>> >推荐你使用ddl来声明你上下游用的connector
>> >
>> >```
>> >table_env.execute_sql("""
>> >CREATE TABLE output (
>> >data STRING ARRAY
>> >) WITH (
>> > 'connector' = 'filesystem',   -- required: specify the connector
>> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
>> > 'format' = 'json',
>> > 'json.fail-on-missing-field' = 'false',
>> > 'json.ignore-parse-errors' = 'true'
>> >)
>> >""")
>> >
>>
>> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>> >```
>> >
>> >Best,
>> >Xingbo
>> >
>> >
>> >
>> >whh_960101  于2020年9月4日周五 下午3:46写道:
>> >
>> >> 您好,我是想让输出insert_into到目标表中,具体如下:
>> >> st_env=StreamExecutionEnvironment.get_execution_environment()
>> >> st_env.connect了一个source table(table包含a字段),
>> >> 然后
>> >> | st_env.connect(FileSystem().path('tmp')) \ |
>> >> | | .with_format(OldCsv() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
>> >> | | .with_schema(Schema() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
>> >> | | .create_temporary_table('sink') |
>> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
>> >> 然后我定义了一个udf
>> >>
>> >>
>> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> def func(a):
>> >> rec_list = a.split(',')
>> >> res_arr = np.arrary(rec_list,dtype=str)
>> >> return res_arr
>> >> st_env.register_function("func", func)
>> >> st_env.from_path("source").select("func(a)").insert_into("sink")
>> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
>> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
>> >> res_arr[0],tmp文件里面的字符串就是正确。
>> >> 我想要得到array,该怎么解决?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
>> >> >Hi,
>> >> >
>> >> >你是调试的时候想看结果吗?
>> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
>> >> >
>> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
>> >> >
>> >> >```
>> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>> >> >
>> >> >@udf(input_types=DataTypes.STRING(),
>> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> >def func(a):
>> >> > return np.array([a, a, a], dtype=str)
>> >> >
>> >> >table_env.register_function("func", func)
>> >> >
>> >> >table.select("func(b)").to_pandas()
>> >> >```
>> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>> >> >
>> >> >whh_960101  于2020年9月4日周五 下午2:50写道:
>> >> >
>> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> >> >> 我的udf输出了一个numpy.array(dtype = str),
>> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>> >> >>
>> >> >>
>> >>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> >> >> 请问这个问题该怎么解决?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang"  写道:
>> >> >> >Hi,
>> >> >> >
>> >> >>
>> >> >>
>> >>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >> >> >
>> >> >> >Best,
>> >> >> >Xingbo
>> >> >> >
>> >> >> >whh_960101  于2020年9月4日周五 上午9:26写道:
>> >> >> >
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> >> >> udf定义如下:
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 

flink1.9.3 on yarn 提交任务问题

2020-09-07 文章 宁吉浩
我选择用 bin/flink run -m yarn cluster 的方式提交任务;
遇到了两个问题: 
1. 这两个任务在一个flink集群ui里可见,甚至和之前的flink-session(测试使用)集群在同一个ui里, 这是正常现象吗?
2. 我知道可以通过并行度和slot的指定来确定 tm的数量,查看下图,两个任务一共占用了yarn的8个容器,请问 cpu这个该如何设定?
ps: 之前使用spark 可以直接设定 执行器核心数量,现在找不到方法,总不能一个tm8个solt,就使用一个cpu吧


Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 文章 whh_960101
您好,完全叙述一下我的问题:
1.首先我需要将一个定义好的python字典作为udf的输入参数,假设这个字典为dic = {1:'a',2:'b'}
那么我在定义udf的时候,如何写输入(一共两个输入参数,一是这个定义好的字典dic,二是一个DataTypes.ARRAY(DataTypes.STRING()),即下文的re_list)
我的方法是:
class fun(ScalarFunction):
def __int__(self):
self.dic = {1:'a',2:'b'}
def eval(self,re_list):
#调用dic时,使用self.dic
#..
return res 
#返回的输出也是一个python的字典,实际是想输出一个两层嵌套的json字典,{'table':'a','data':{'data1':'b','data2':'c'}




st_env.register_function("fun",udf(fun(),DataTypes.ARRAY(DataTypes.STRING()),DataTypes.MAP(DataTypes.STRING(),DataTypes.STRING(
#这样写是否正确
st_env.from_path("source").select("fun(t)").execute_insert("sink")


2.我在定义sink表采用
st_env.execute_sql("""
CREATE TABLE sink(
table STRING,
data STRING 
)WITH(
'connector' = 'filesystem',
'path' = 'home/res/',
'format' = 'csv') #format如果是json就报错ParseException:Encountered"table"at line 
1,column 43 was expecting one of "CONSTRAINT" ... "PRIMARY"
""") #我想要在问题1中打印出来输出的json字典,这样的ddl的定义方式是否正确











在 2020-09-07 11:33:06,"Xingbo Huang"  写道:
>Hi,
>你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
>
>你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
>[2] 来读取一个dataframe。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
>
>Best,
>Xingbo
>
>whh_960101  于2020年9月7日周一 上午11:22写道:
>
>> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
>> dic = {1:'a',2:'b'}
>> 此时定义udf如下:
>>
>> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
>> def func(dic,f):
>>..
>>return L
>> st_env.register_function("func", func)
>> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
>> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
>> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 16:02:56,"Xingbo Huang"  写道:
>> >Hi,
>> >
>> >推荐你使用ddl来声明你上下游用的connector
>> >
>> >```
>> >table_env.execute_sql("""
>> >CREATE TABLE output (
>> >data STRING ARRAY
>> >) WITH (
>> > 'connector' = 'filesystem',   -- required: specify the connector
>> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
>> > 'format' = 'json',
>> > 'json.fail-on-missing-field' = 'false',
>> > 'json.ignore-parse-errors' = 'true'
>> >)
>> >""")
>> >
>>
>> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>> >```
>> >
>> >Best,
>> >Xingbo
>> >
>> >
>> >
>> >whh_960101  于2020年9月4日周五 下午3:46写道:
>> >
>> >> 您好,我是想让输出insert_into到目标表中,具体如下:
>> >> st_env=StreamExecutionEnvironment.get_execution_environment()
>> >> st_env.connect了一个source table(table包含a字段),
>> >> 然后
>> >> | st_env.connect(FileSystem().path('tmp')) \ |
>> >> | | .with_format(OldCsv() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
>> >> | | .with_schema(Schema() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
>> >> | | .create_temporary_table('sink') |
>> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
>> >> 然后我定义了一个udf
>> >>
>> >>
>> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> def func(a):
>> >> rec_list = a.split(',')
>> >> res_arr = np.arrary(rec_list,dtype=str)
>> >> return res_arr
>> >> st_env.register_function("func", func)
>> >> st_env.from_path("source").select("func(a)").insert_into("sink")
>> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
>> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
>> >> res_arr[0],tmp文件里面的字符串就是正确。
>> >> 我想要得到array,该怎么解决?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
>> >> >Hi,
>> >> >
>> >> >你是调试的时候想看结果吗?
>> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
>> >> >
>> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
>> >> >
>> >> >```
>> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>> >> >
>> >> >@udf(input_types=DataTypes.STRING(),
>> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> >def func(a):
>> >> > return np.array([a, a, a], dtype=str)
>> >> >
>> >> >table_env.register_function("func", func)
>> >> >
>> >> >table.select("func(b)").to_pandas()
>> >> >```
>> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>> >> >
>> >> >whh_960101  于2020年9月4日周五 下午2:50写道:
>> >> >
>> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> >> >> 我的udf输出了一个numpy.array(dtype = str),
>> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>> >> >>
>> >> >>
>> >>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> >> >> 

Re: 邮件列表地址错误

2020-09-07 文章 darion
了解 谢谢您



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

2020-09-07 文章 Jingsong Li
Hi,

flink-orc实现的OrcBulkWriterFactory,是有点“ 深入“的,重写了部分ORC的代码,所以没那么好做版本兼容。

你可以考虑使用Hive的streaming写,它使用native的hive orc writer[1],可以对应你需要的那个版本。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_streaming.html#streaming-writing

Best,
Jingsong

On Mon, Sep 7, 2020 at 2:11 PM 大罗  wrote:

> Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink
> sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下:
>
> 我所使用的版本如下:
>
> Hadoop 3.0.0+cdh6.3.2
>
> HDFS 3.0.0+cdh6.3.2
>
> HBase 2.1.0+cdh6.3.2
>
> Hive 2.1.1+cdh6.3.2
>
> Flink 1.11.1
>
> 我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下:
>
> TestStreamFileSinkViaCustomVectorizer.java
> <
> http://apache-flink.147419.n8.nabble.com/file/t909/TestStreamFileSinkViaCustomVectorizer.java>
>
>
>
> 然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14
>
> 那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下:
>
> CREATE TABLE ods_person_test_os(
> name string, age int)
> partitioned by (dt_day string, dt_hour string)
> STORED AS ORC
> LOCATION 'hdfs://nameservice1/tmp/person_orc/'
> TBLPROPERTIES(
>  'orc.compress'='SNAPPY'
> );
>
> 当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06',
> dt_hour='16')
> ",
> 运行查询语句 "select * from ods_person_test_os"后,报错,
> hive-error.txt
> 
>
> 其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。
>
> 经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。
>
>
> 'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.ORC_517,也就是第7个。
>
> ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0),
> HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1),
> HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2),
> HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3),
> HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4),
> ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5),
> ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6),
> ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7),
> ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6),
> PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6),
> SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO,
> 6),
> FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647);
>
>
> 而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
> = OrcFile.WriterVersion.HIVE_13083,  并且不存在第7个version。
> ORIGINAL(0),
> HIVE_8732(1),
> HIVE_4243(2),
> HIVE_12055(3),
> HIVE_13083(4),
> FUTURE(2147483647);
>
> 所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析!
>
> 为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude
>
> 'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER
> =
>
> OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错:
> 1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法;
> 2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter';
>
> 那么,为了匹配不同版本hive使用的orc writer
> version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢?
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


flink 1.11 streaming file sink to hdfs orc file format could be recognized by hive

2020-09-07 文章 大罗
Hi,大家好,我的试验场景是想把kafka的数据直接写到hive(orc format),构建一个实时数仓的理念,但是,通过flink
sql,又因为依赖问题而不能成功,那么我就想可否通过streaming file sink写入hdfs文件,给hive识别呢。我的试验过程如下:

我所使用的版本如下:

Hadoop 3.0.0+cdh6.3.2

HDFS 3.0.0+cdh6.3.2

HBase 2.1.0+cdh6.3.2

Hive 2.1.1+cdh6.3.2

Flink 1.11.1

我在代码中引入依赖'org.apache.flink:flink-orc_2.11:1.11.0', 核心代码如下:

TestStreamFileSinkViaCustomVectorizer.java

  

然后,的确可以在hdfs相关目录,找到写入后的文件,比如:/tmp/person_orc/dt_day=2020-09-06/dt_hour=16/part-4-14

那么我就想这个文件,是否可以给hive的sql读取呢,我的hive sql如下:

CREATE TABLE ods_person_test_os( 
name string, age int) 
partitioned by (dt_day string, dt_hour string)
STORED AS ORC 
LOCATION 'hdfs://nameservice1/tmp/person_orc/'
TBLPROPERTIES(
 'orc.compress'='SNAPPY'
);

当我手动添加分区"alter table ods_person_test_os add partition(dt_day='2020-09-06',
dt_hour='16')
",
运行查询语句 "select * from ods_person_test_os"后,报错,
hive-error.txt
  

其中核心的关键字"java.lang.ArrayIndexOutOfBoundsException: 7",很明显,数组越界。

经过仔细的调试后,我认为,问题出现在依赖'orc-core'里。

'org.apache.flink:flink-orc_2.11:1.11.0',会导入依赖'org.apache.orc:orc-core:1.5.6',其中的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
= OrcFile.WriterVersion.ORC_517,也就是第7个。

ORIGINAL(OrcFile.WriterImplementation.ORC_JAVA, 0),
HIVE_8732(OrcFile.WriterImplementation.ORC_JAVA, 1),
HIVE_4243(OrcFile.WriterImplementation.ORC_JAVA, 2),
HIVE_12055(OrcFile.WriterImplementation.ORC_JAVA, 3),
HIVE_13083(OrcFile.WriterImplementation.ORC_JAVA, 4),
ORC_101(OrcFile.WriterImplementation.ORC_JAVA, 5),
ORC_135(OrcFile.WriterImplementation.ORC_JAVA, 6),
ORC_517(OrcFile.WriterImplementation.ORC_JAVA, 7),
ORC_CPP_ORIGINAL(OrcFile.WriterImplementation.ORC_CPP, 6),
PRESTO_ORIGINAL(OrcFile.WriterImplementation.PRESTO, 6),
SCRITCHLEY_GO_ORIGINAL(OrcFile.WriterImplementation.SCRITCHLEY_GO,
6),
FUTURE(OrcFile.WriterImplementation.UNKNOWN, 2147483647);

而我的hive版本使用的orc版本为'org.apache.hive:hive-orc:2.1.1-cdh6.3.2',里面的org.apache.orc.OrcFile.WriterVersion定义如下,而且CURRENT_WRITER
= OrcFile.WriterVersion.HIVE_13083,  并且不存在第7个version。
ORIGINAL(0),
HIVE_8732(1),
HIVE_4243(2),
HIVE_12055(3),
HIVE_13083(4),
FUTURE(2147483647);

所以,当hive解析orc文件时,使用第7个版本就会报错!这是我的分析!

为此,我在想能否通过在引入'org.apache.flink:flink-orc_2.11:1.11.0'时,exclude
'org.apache.orc:orc-core:1.5.6',然后,再引入低版本的,比如'org.apache.orc:orc-core:1.1.2',也就是CURRENT_WRITER
=
OrcFile.WriterVersion.HIVE_13083,但是,又遇到以下两个问题导致初始化OrcBulkWriterFactory的时候出错:
1. 低版本的orc-core中的类'org.apache.orc.TypeDescription' 没有 fromString这个方法;
2. 低版本的orc-core中没有'org.apache.orcPhysicalWriter';

那么,为了匹配不同版本hive使用的orc writer
version,'flink-orc'是否可以在构建OrcBulkWriterFactory时,多一些动态参数呢?






--
Sent from: http://apache-flink.147419.n8.nabble.com/