回复:回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 文章
就第二次提供的日志看,好像是你的namenode出现的问题


--
发件人:MuChen <9329...@qq.com>
发送时间:2020年9月8日(星期二) 10:56
收件人:user-zh@flink.apache.org 夏帅 ; user-zh 

主 题:回复: 回复:使用StreamingFileSink向hive metadata中增加分区部分失败

在checkpoint失败的时间,tm上还有一些info和warn级别的日志:
2020-09-04 17:17:59,520 INFO org.apache.hadoop.io.retry.RetryInvocationHandler 
[] - Exception while invoking create of class 
ClientNamenodeProtocolTranslatorPB over 
uhadoop-op3raf-master2/10.42.52.202:8020 after 14 fail over attempts. Trying to 
fail over immediately.
java.io.IOException: java.lang.InterruptedException
at org.apache.hadoop.ipc.Client.call(Client.java:1449) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1401) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy26.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:295)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_144]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at com.sun.proxy.$Proxy27.create(Unknown Source) ~[?:?]
at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1721)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1657) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1582) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) 
~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
 ~[flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.SuccessFileCommitPolicy.commit(SuccessFileCommitPolicy.java:45)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.commitPartitions(StreamingFileCommitter.java:167)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.processElement(StreamingFileCommitter.java:144)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[music_copyright-1.0-SNAPSHOT-jar-with-dependenci

回复:使用StreamingFileSink向hive metadata中增加分区部分失败

2020-09-07 文章
异常日志只有这些么?有没有详细点的

回复: flink1.9写权限认证的es6

2020-07-16 文章
get到了





来自钉钉专属商务邮箱--
发件人:Yangze Guo
日 期:2020年07月17日 13:38:35
收件人:user-zh
主 题:Re: flink1.9写权限认证的es6

Hi,

SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Fri, Jul 17, 2020 at 10:12 AM Dream-底限  wrote:
>
> hi:
> 请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下



回复:flink1.9写权限认证的es6

2020-07-16 文章
你好,请问是FlinkSQL么
FLinkSQL可以参考下这份邮件
http://apache-flink.147419.n8.nabble.com/ddl-es-td2094.html
DataStream可以尝试自定义ElasticsearchSink实现权限认证


--
发件人:Dream-底限 
发送时间:2020年7月17日(星期五) 10:12
收件人:user-zh 
主 题:flink1.9写权限认证的es6

hi:
请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


回复:flink connector formats问题

2020-07-16 文章
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/


--
发件人:酷酷的浑蛋 
发送时间:2020年7月17日(星期五) 10:42
收件人:user-zh 
主 题:flink connector formats问题

请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

回复:回复: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章
你好,
可以参考下这个问题的解决
http://apache-flink.147419.n8.nabble.com/Table-options-do-not-contain-an-option-key-connector-for-discovering-a-connector-td4767.html


--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:50
收件人:user-zh ; 夏帅 ; Leonard Xu 

主 题:Re: 回复: 不能实时读取实时写入到 Hive 的数据


应该是我没有理解 partitiion-commit 
的意思,我看这里有文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit


CREATE TABLE kafka_ods_wms_pick_order (
 order_no STRING,
 status INT,
 dispatch_time TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'ods_wms_pick_order',
 'properties.bootstrap.servers' = ':9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE hive_ods_wms_pick_order (
  order_no STRING,
  status INT,
  dispatch_time TIMESTAMP
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

INSERT INTO TABLE hive_ods_wms_pick_order SELECT order_no, status, 
dispatch_time, DATE_FORMAT(dispatch_time, '-MM-dd'), 
DATE_FORMAT(dispatch_time, 'HH') FROM kafka_ods_wms_pick_order;
SELECT * FROM hive_ods_wms_pick_order /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-07-24') */;




wangl...@geekplus.com.cn


Sender: 夏帅
Send Time: 2020-07-14 16:43
Receiver: user-zh; xbjtdcq
Subject: 回复:Re: 不能实时读取实时写入到 Hive 的数据
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么
--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据
我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 
谢谢,
王磊
wangl...@geekplus.com.cn 
发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei
你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。
祝好,
Leonard Xu
> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 


回复:Re: 不能实时读取实时写入到 Hive 的数据

2020-07-14 文章
你好,
这说明写入的hive文件没有进行rollup,可以贴下SQL么


--
发件人:wangl...@geekplus.com.cn 
发送时间:2020年7月14日(星期二) 16:40
收件人:user-zh ; xbjtdcq 
主 题:回复: Re: 不能实时读取实时写入到 Hive 的数据


我加上了这个 tablehint 。
任务提交上去了,但客户端还是没有任何返回显示。 
我到 hadoop 集群上看了下 hive 表所在的这个目录,所有的文件都是 .part 开头的 inprogress 文件。 

谢谢,
王磊



wangl...@geekplus.com.cn 

发件人: Leonard Xu
发送时间: 2020-07-14 16:17
收件人: user-zh
主题: Re: 不能实时读取实时写入到 Hive 的数据
HI, wanglei

你开启了 streaming-source.enable 
吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.consume-start-offset'='2020-05-20') */;

就在你看得这个页面应该有对应的文档说明如何读取hive数据。

祝好,
Leonard Xu

> 在 2020年7月14日,15:47,wangl...@geekplus.com.cn 写道:
> 
> 
> 试验了一下 Flink-1.11  hive streaming 的功能
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
> 
> 创建 kafka 表,通过 SQL 实时写入 Hive.
> 
> 但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 
> 页面观察 这个 select * from hive_table 的 job 已经结束了。
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com.cn 
> 



回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章
你好,
本质还是StreamingFileSink,所以目前只能append


--
发件人:Zhou Zach 
发送时间:2020年7月14日(星期二) 10:56
收件人:user-zh 
主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录




Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式














在 2020-07-14 09:56:00,"Leonard Xu"  写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach  写道:
>> 
   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
 cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu


回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章
你好,
你设置了1个小时的
SINK_PARTITION_COMMIT_DELAY


--
发件人:Zhou Zach 
发送时间:2020年7月13日(星期一) 17:09
收件人:user-zh 
主 题:Re:Re: Re: Table options do not contain an option key 'connector' for 
discovering a connector.

开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li"  写道:
>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |
>> >> |CREATE TABLE hive_table (
>> >> |  user_id STRING,
>> >> |  age INT
>> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >> |  'sink.partition-commit.trigger'='partition-time',
>> >> |  'sink.partition-commit.delay'='1 h',
>> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> |)
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |CREATE TABLE kafka_table (
>> >> |uid VARCHAR,
>> >> |-- uid BIGINT,
>> >> |sex VARCHAR,
>> >> |age INT,
>> >> |created_time TIMESTAMP(3),
>> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >> |) WITH (
>> >> |'connector.type' = 'kafka',
>> >> |'connector.version' = 'universal',
>> >> | 'connector.topic' = 'user',
>> >> |-- 'connector.topic' = 'user_long',
>> >> |'connector.startup-mode' = 'latest-offset',
>> >> |'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> |'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> |'connector.properties.group.id' = 'user_flink',
>> >> |'format.type' = 'json',
>> >> |'format.derive-schema' = 'true'
>> >> |)
>> >> |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |INSERT INTO hive_table
>> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >> |FROM kafka_table
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> |
>> >> |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >> at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >> at
>> >>
>> 

回复:flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-10 文章
你好,
我这边同样的代码,并没有出现类似的问题
是本地跑么,可以提供下日志信息么?



回复:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 文章
感谢


回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 文章
你好,
可以看看你的代码结构是不是以下这种
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
  ..
tableEnv.execute("")
如果是的话,可以尝试使用bsEnv.execute("")
1.11对于两者的execute代码实现有改动


--
发件人:Zhou Zach 
发送时间:2020年7月8日(星期三) 15:30
收件人:Flink user-zh mailing list 
主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology

代码在flink 
1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)


但是,数据是正常sink到了hbase,是不是executeSql误报了。。。




query:
streamTableEnv.executeSql(
  """
|
|CREATE TABLE `user` (
|uid BIGINT,
|sex VARCHAR,
|age INT,
|created_time TIMESTAMP(3),
|WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
|'connector.type' = 'kafka',
|'connector.version' = 'universal',
|-- 'connector.topic' = 'user',
|'connector.topic' = 'user_long',
|'connector.startup-mode' = 'latest-offset',
|'connector.properties.group.id' = 'user_flink',
|'format.type' = 'json',
|'format.derive-schema' = 'true'
|)
|""".stripMargin)






streamTableEnv.executeSql(
  """
|
|CREATE TABLE user_hbase3(
|rowkey BIGINT,
|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|) WITH (
|'connector.type' = 'hbase',
|'connector.version' = '2.1.0',
|'connector.table-name' = 'user_hbase2',
|'connector.zookeeper.znode.parent' = '/hbase',
|'connector.write.buffer-flush.max-size' = '10mb',
|'connector.write.buffer-flush.max-rows' = '1000',
|'connector.write.buffer-flush.interval' = '2s'
|)
|""".stripMargin)


streamTableEnv.executeSql(
  """
|
|insert into user_hbase3
|SELECT uid,
|
|  ROW(sex, age, created_time ) as cf
|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
created_time from `user`)
|
|""".stripMargin)










回复:FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 文章
你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
  override def serialize(element: DemoBean, timestamp: lang.Long): 
ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, 
element.getValue)
  }
}
--
发件人:18579099...@163.com <18579099...@163.com>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh 
主 题:FlinkKafkaProducer没有写入多个topic的功能

我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com



回复:【Flink的shuffle mode】

2020-07-07 文章
补充: 1.11的shuffle-mode配置的默认值为ALL_EDGES_BLOCKING
共有
ALL_EDGES_BLOCKING(等同于batch)
FORWARD_EDGES_PIPELINEDPOINTWISE_EDGES_PIPELINED
ALL_EDGES_PIPELINED(等同于pipelined)对于pipelined多出了两种选择


--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月7日(星期二) 23:37
收件人:user-zh 
主 题:回复: 【Flink的shuffle mode】

如果是批的模式,怎么在应用程序里面指定shuffle_mode呢?
另外,下面提到如果是流的计算,一定是pipeline模式.
那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢?
谢谢.




--原始邮件--
发件人:"Jingsong Li"

回复:【Flink的shuffle mode】

2020-07-07 文章
你好:
问题1,指定shuffle_mode
tEnv.getConfig.getConfiguration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
 "pipeline")
问题2,mode是UNDEFINED的概念
使用UNDEFINED并不是说模式没有定义,而是由框架自己决定
The shuffle mode is undefined. It leaves it up to the framework to decide the 
shuffle mode.



--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月7日(星期二) 23:37
收件人:user-zh 
主 题:回复: 【Flink的shuffle mode】

如果是批的模式,怎么在应用程序里面指定shuffle_mode呢?
另外,下面提到如果是流的计算,一定是pipeline模式.
那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢?
谢谢.




--原始邮件--
发件人:"Jingsong Li"

回复:【Flink的shuffle mode】

2020-07-05 文章
你好,可以参考下ExecutionConfigOptions,OptimizerConfigOptions和GlobalConfiguration,里面有比较清楚地介绍




--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月6日(星期一) 12:16
收件人:user-zh 
主 题:回复:【Flink的shuffle mode】

那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?



发自我的iPhone


-- 原始邮件 --
发件人: Jingsong Li 

回复:Position out of bounds.

2020-07-02 文章
不好意思,看错了,这里是自增了





来自钉钉专属商务邮箱--
发件人:xuhaiLong
日 期:2020年07月02日 18:46:37
收件人:夏帅
抄 送:user-zh
主 题:回复:Position out of bounds.

感谢
没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案?


| |
夏*
|
|
邮箱:xiagu...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月02日 18:39,夏帅 写道:
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
  if (this.position >= this.buffer.length) {
 resize(1);
  }
  this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.


flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5




回复:Position out of bounds.

2020-07-02 文章
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
   if (this.position >= this.buffer.length) {
  resize(1);
   }
   this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.

  
flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
 程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5



回复:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

2020-07-01 文章
你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once

Kafka011TableSink


@Override
protected SinkFunction createKafkaProducer(
  String topic,
  Properties properties,
  SerializationSchema serializationSchema,
  Optional> partitioner) {
   return new FlinkKafkaProducer011<>(
  topic,
  new KeyedSerializationSchemaWrapper<>(serializationSchema),
  properties,
  partitioner,
  FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
  5);
}
如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase

参考: 
https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
--
发件人:静谧雨寒 
发送时间:2020年7月1日(星期三) 14:33
收件人:user-zh 
主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql 
sink表使用两阶事务提交,exactly-once一致性保证 ?
官档说法:
Consistency guarantees: By default, a Kafka sink ingests data with 
at-least-once guarantees into a Kafka topic if the query is executed with 
checkpointing enabled.,
CREATE TABLE 默认是 at-least-once



回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-29 文章
你好,我试了一下,纯DataStream的方式是可以使用的,具体使用参考`flink-formats\flink-parquet\src\test\java\org\apache\flink\formats\parquet\avro\ParquetStreamingFileSinkITCase`

在Table转DataStream的方式中,我是先将Table转换为DataStream[Row],然后再进行转换生成DataStream[GenericRecord]
dataStream.map(x => {
  ...val fields = new util.ArrayList[Schema.Field]
  fields.add(new Schema.Field("platform", 
create(org.apache.avro.Schema.Type.STRING), "platform", null))
  fields.add(new Schema.Field("event", 
create(org.apache.avro.Schema.Type.STRING), "event", null))
  fields.add(new Schema.Field("dt", create(org.apache.avro.Schema.Type.STRING), 
"dt", null))
  val parquetSinkSchema: Schema = createRecord("pi", "flinkParquetSink",
"flink.parquet", true, fields)
  val record = new 
GenericData.Record(parquetSinkSchema).asInstanceOf[GenericRecord]
  record.put("platform", x.get(0))
  record.put("event", x.get(1))
  record.put("dt", x.get(2))
  record
})



--
发件人:yingbo yang 
发送时间:2020年6月29日(星期一) 10:04
收件人:夏帅 
抄 送:user-zh 
主 题:Re: flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

你好:
可以使用 GenericRecordAvroTypeInfo 这个类型,但是这个类型只适合于 table 中只有一个 字段的情况;否则会出现异常:
代码:
ArrayList fields = new 
ArrayList();
fields.add(new org.apache.avro.Schema.Field("id", 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "id", 
JsonProperties.NULL_VALUE));
fields.add(new org.apache.avro.Schema.Field("time", 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "time", 
JsonProperties.NULL_VALUE));
org.apache.avro.Schema parquetSinkSchema = 
org.apache.avro.Schema.createRecord("pi", "flinkParquetSink", "flink.parquet", 
true, fields);
String fileSinkPath = "./xxx.text/rs6/";


GenericRecordAvroTypeInfo genericRecordAvroTypeInfo = new 
GenericRecordAvroTypeInfo(parquetSinkSchema);
DataStream testDataStream1 = flinkTableEnv.toAppendStream(test, 
genericRecordAvroTypeInfo);

testDataStream1.print().setParallelism(1);


StreamingFileSink parquetSink = StreamingFileSink.
forBulkFormat(new Path(fileSinkPath),
ParquetAvroWriters.forGenericRecord(parquetSinkSchema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
testDataStream1.addSink(parquetSink).setParallelism(1);
flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava");

异常:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/yyb/Software/localRepository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/yyb/Software/localRepository/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
root
 |-- id: STRING
 |-- time: STRING

09:40:35,872 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a getter for field fields
09:40:35,874 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a setter for field fields
09:40:35,874 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - Class class org.apache.flink.types.Row cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance.
09:40:36,191 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a getter for field fields
09:40:36,191 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a setter for field fields
09:40:36,191 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - Class class org.apache.flink.types.Row cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@2149594a] 
does not match the number[1] of requested type 
[GenericRecord("{"type":"error","name":"pi","namespace":"flink.parquet","doc":"flinkParquetSink","fields":[{"name":"id","type":&quo

回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-28 文章
你好,这个问题从异常来看是使用TupleTypeInfo导致的,可以试下使用GenericRecordAvroTypeInfo


--
发件人:yingbo yang 
发送时间:2020年6月28日(星期日) 17:38
收件人:user-zh 
主 题:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

Hi:
在使用 ParquetAvroWriters.forGenericRecord(Schema schema)
写parquet文件的时候 出现 类转化异常:
下面是我的代码:

// //transfor 2 dataStream // TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(GenericData.Record.class,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
 DataStream testDataStream = flinkTableEnv.toAppendStream(test, tupleTypeInfo);
 testDataStream.print().setParallelism(1);
ArrayList fields = new
ArrayList();
 fields.add(new org.apache.avro.Schema.Field("id",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"id", JsonProperties.NULL_VALUE));
 fields.add(new org.apache.avro.Schema.Field("time",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"time", JsonProperties.NULL_VALUE));
 org.apache.avro.Schema parquetSinkSchema =
org.apache.avro.Schema.createRecord("pi", "flinkParquetSink",
"flink.parquet", true, fields);
 String fileSinkPath = "./xxx.text/rs6/";
StreamingFileSink parquetSink = StreamingFileSink.
 forBulkFormat(new Path(fileSinkPath),
 ParquetAvroWriters.forGenericRecord(parquetSinkSchema))
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .build();
 testDataStream.addSink(parquetSink).setParallelism(1);
 flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava");


下面是异常:

09:29:50,283 INFO  org.apache.flink.runtime.taskmanager.Task
  - Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)
switched from RUNNING to FAILED.09:29:50,283 INFO
org.apache.flink.runtime.taskmanager.Task - Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING
to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:697)

at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)

at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)

at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)

at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)

at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445)

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)09:29:50,284

INFO  org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Sink: Unnamed (1/1)
(79505cb6ab2df38886663fd99461315a).09:29:50,285 INFO
org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task Sink: Unnamed
(1/1) (79505cb6ab2df38886663fd99461315a) [FAILED]09:29:50,289

INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor-
Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Unnamed (1/1)
79505cb6ab2df38886663fd99461315a.09:29:50,293 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)

switched from RUNNING to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord at

Flink新出的OrcBulkWriterFactory有没有大佬给个详细的Demo

2020-06-08 文章
自己在使用时,会有文件生成,但是文件内并不包含数据

回复:Flink1.11-release编译部署后sql-client的bug

2020-06-01 文章
好的,感谢


--
发件人:godfrey he 
发送时间:2020年6月2日(星期二) 12:32
收件人:user-zh 
抄 送:夏帅 
主 题:Re: Flink1.11-release编译部署后sql-client的bug

Hi, 夏帅

感谢反馈问题,我建了一个issue https://issues.apache.org/jira/browse/FLINK-18055,应该今天就可以fix

Best,
Godfrey
Leonard Xu  于2020年6月2日周二 下午12:13写道:
Hi, 夏帅

 感谢反馈,这应该是个bug,我 这边本地也复现了,我先看下哈

 祝好,
 Leonard Xu

 > 在 2020年6月2日,11:57,夏帅  写道:
 > 
 > 是我编译的问题么,在window下编译的



Flink1.11-release编译部署后sql-client的bug

2020-06-01 文章

大家好,有人编译部署过flink-1.11-release么,为什么我使用sql-client时设置了catalog
但是并不生效,顺带自动补全也不太好使
是我编译的问题么,在window下编译的
编译步骤见链接
https://jxeditor.github.io/2020/06/01/Flink1.11.0%E7%BC%96%E8%AF%91/
$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 
2018-06-18T02:33:14+08:00)

Flink SQL> show catalogs;
default_catalog
hive

Flink SQL> use  catalog hive;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name 
[`hive`] does not exist.




回复:Kafka Consumer反序列化错问题

2020-05-29 文章
可以排除一下是否是jar包冲突


--
发件人:Even <452232...@qq.com>
发送时间:2020年5月29日(星期五) 16:17
收件人:user-zh 
主 题:Kafka Consumer反序列化错问题

Hi!
请教一个Kafka Consumer反序列问题:
一个kafkaconsumerjob 提交到Flink session cluster时运行稳定,但是独立提交到到Flink 
per-job cluster 就报kafka反序列化错,报错信息如下:
其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = 
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
properties))
2020-05-2717:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.