退订

2023-04-17 文章 杨伟伟
退订

Re: 退订

2023-04-17 文章 Shammon FY
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org
,可以参考
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Fri, Apr 14, 2023 at 7:32 PM daniel sun  wrote:

> 退订
> zjw 于2023年4月14日 周五下午7:17写道:
>
> >
>


Re: 流数据转化为json

2023-04-17 文章 Weihua Hu
Hi,

你使用的那个 Flink 版本,建议直接参考 Flink 官方 kafka connector 文档[1]。
转换为 Json 数据格式可以使用 flink-json  format

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/json/

Best,
Weihua


On Fri, Apr 14, 2023 at 7:17 PM 小昌同学  wrote:

> 你好,请问一下上游的数据是
> SingleOutputStreamOperator outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Flink Streaming API ElasticSearch Connector 长连接

2023-04-17 文章 Yuze Wei
各位大佬好!

我在使用Flink ES连接器的时候,有时候报以下错误:

Caused by: java.io.IOException
breakpoint : 远程主机强迫关闭了一个现有的连接

初步判断,应该是没有维持住长连接保活,所以如果一段时间不写入数据,连接就断了。

请问各位大佬,ElasticSearch Connector 有什么参数可以维持长连接吗?

ElasticSearch Connector 代码如下:

jsonStringStream
.sinkTo(
new Elasticsearch7SinkBuilder()
// Instructs the sink to emit after every Nth buffered 
element
.setBulkFlushMaxActions(1)
.setHosts(
new HttpHost(

Conn.getInstance().getProp("elasticsearch.hosts"),
Integer.parseInt(

Conn.getInstance().getProp("elasticsearch.port")
),

Conn.getInstance().getProp("elasticsearch.scheme")
)
)
.setEmitter(
(page, context, indexer) ->
indexer.add(
new 
IndexRequest(Conn.getInstance().getProp("elasticsearch.index.page"))
.source(page, 
XContentType.JSON)
)
)

.setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
.build());

报错如下:
[cid:71c0dada-02b0-4a0d-b16f-97bb0d65167f]

多谢指教!


Re: 用Flink Table API和RocksDB不能正常升级状态数据结构

2023-04-17 文章 Shammon FY
Hi

目前增减列数据会导致状态无法兼容

Best,
Shammon FY


On Fri, Apr 14, 2023 at 9:09 PM Elvis Chen 
wrote:

> 我们正在使用flink-1.16.0的Table API和RocksDB作为后端,为我们的用户提供运行SQL
>
> queries的服务。表格是使用Avro模式创建的,当以兼容的方式更改模式,例如添加一个带默认值的field时,我们无法从savepoint恢复作业。这是在数据结构升级后的报错:
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must
> not be incompatible with the old state serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984)
> ...
>


Re: 流数据转化为json

2023-04-17 文章 Shammon FY
Hi

对于kafka的问题,使用print或者其他方式有数据输出吗?可以通过这种方式确认一下是作业本身的数据问题还是kafka的问题

Best,
Shammon FY


On Fri, Apr 14, 2023 at 7:17 PM 小昌同学  wrote:

> 你好,请问一下上游的数据是
> SingleOutputStreamOperator outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: sink mysql id自增表数据会丢失

2023-04-17 文章 Shammon FY
Hi

如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?

On Sun, Apr 16, 2023 at 7:58 PM Jeff  wrote:

> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
>
>
>  mysql内表ddl:
>
> create table test (id bigint primary key auto_increment , passport
> varchar);
>
>
> flink sql:
> insert into mysql_catalog.test select 0, passport from source_table;
>
> 之所以select 0是表示使用物理表的自增值。