退订
退订
Re: 退订
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org ,可以参考 https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list On Fri, Apr 14, 2023 at 7:32 PM daniel sun wrote: > 退订 > zjw 于2023年4月14日 周五下午7:17写道: > > > >
Re: 流数据转化为json
Hi, 你使用的那个 Flink 版本,建议直接参考 Flink 官方 kafka connector 文档[1]。 转换为 Json 数据格式可以使用 flink-json format [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink [2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/json/ Best, Weihua On Fri, Apr 14, 2023 at 7:17 PM 小昌同学 wrote: > 你好,请问一下上游的数据是 > SingleOutputStreamOperator outPutInfoStream = > keyedStream.process(new KeyStreamFunc()); > 数据样式为:InPutInfo[phone='123456',workId='001'] > 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案; > 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点 > | | > 小昌同学 > | > | > ccc0606fight...@163.com > |
Flink Streaming API ElasticSearch Connector 长连接
各位大佬好! 我在使用Flink ES连接器的时候,有时候报以下错误: Caused by: java.io.IOException breakpoint : 远程主机强迫关闭了一个现有的连接 初步判断,应该是没有维持住长连接保活,所以如果一段时间不写入数据,连接就断了。 请问各位大佬,ElasticSearch Connector 有什么参数可以维持长连接吗? ElasticSearch Connector 代码如下: jsonStringStream .sinkTo( new Elasticsearch7SinkBuilder() // Instructs the sink to emit after every Nth buffered element .setBulkFlushMaxActions(1) .setHosts( new HttpHost( Conn.getInstance().getProp("elasticsearch.hosts"), Integer.parseInt( Conn.getInstance().getProp("elasticsearch.port") ), Conn.getInstance().getProp("elasticsearch.scheme") ) ) .setEmitter( (page, context, indexer) -> indexer.add( new IndexRequest(Conn.getInstance().getProp("elasticsearch.index.page")) .source(page, XContentType.JSON) ) ) .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000) .build()); 报错如下: [cid:71c0dada-02b0-4a0d-b16f-97bb0d65167f] 多谢指教!
Re: 用Flink Table API和RocksDB不能正常升级状态数据结构
Hi 目前增减列数据会导致状态无法兼容 Best, Shammon FY On Fri, Apr 14, 2023 at 9:09 PM Elvis Chen wrote: > 我们正在使用flink-1.16.0的Table API和RocksDB作为后端,为我们的用户提供运行SQL > > queries的服务。表格是使用Avro模式创建的,当以兼容的方式更改模式,例如添加一个带默认值的field时,我们无法从savepoint恢复作业。这是在数据结构升级后的报错: > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer > (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must > not be incompatible with the old state serializer > (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984) > ... >
Re: 流数据转化为json
Hi 对于kafka的问题,使用print或者其他方式有数据输出吗?可以通过这种方式确认一下是作业本身的数据问题还是kafka的问题 Best, Shammon FY On Fri, Apr 14, 2023 at 7:17 PM 小昌同学 wrote: > 你好,请问一下上游的数据是 > SingleOutputStreamOperator outPutInfoStream = > keyedStream.process(new KeyStreamFunc()); > 数据样式为:InPutInfo[phone='123456',workId='001'] > 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案; > 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点 > | | > 小昌同学 > | > | > ccc0606fight...@163.com > |
Re: sink mysql id自增表数据会丢失
Hi 如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试? On Sun, Apr 16, 2023 at 7:58 PM Jeff wrote: > sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。 > > > mysql内表ddl: > > create table test (id bigint primary key auto_increment , passport > varchar); > > > flink sql: > insert into mysql_catalog.test select 0, passport from source_table; > > 之所以select 0是表示使用物理表的自增值。