Re: 回复: 如何从复杂的kafka消息体定义 table

2021-07-08 Thread Caizhi Weng
Hi!您可以像 JasonLee 提供的文章中一样先用 DDL 描述 kafka 消息的结构,之后在 SQL 代码中通过 create view 抽取
APPLY_PERSON_ID 等信息,就可以达成您需要的效果。

您的一条 kafka 消息似乎就对应 MyUserTable 的一行,看起来没有列转行的需求。

Chenzhiyuan(HR)  于2021年7月9日周五 上午11:57写道:

> 列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。
>
> -邮件原件-
> 发件人: JasonLee [mailto:17610775...@163.com]
> 发送时间: 2021年7月9日 11:34
> 收件人: user-zh@flink.apache.org
> 主题: 回复: 如何从复杂的kafka消息体定义 table
>
> Hi
>
>
> 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.
>
>
> Best
> JasonLee
>
>
> 在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
> 消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:
>
> CREATE TABLE MyUserTable(
> APPLY_PERSON_ID VARCHAR,
> UPDATE_SALARY DECIMAL,
> UP_AMOUNT DECIMAL,
> CURRENCY VARCHAR,
> EXCHANGE_RATE DECIMAL
> ) with (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'topic_name',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092', '
> connector.properties.group.id' = 'testGroup', 'format.type' = '?'
>
> 接下来直接查询每个字段的值:
> Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY,
> UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");
>
> 请教下这个该如何定义DDL.
>
>
>
> 发件人: 17610775726 [mailto:17610775...@163.com]
> 发送时间: 2021年7月9日 9:26
> 收件人: Chenzhiyuan(HR) 
> 主题: 回复:如何从复杂的kafka消息体定义 table
>
> hi
>
> 用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
> Best
> JasonLee
>  回复的原邮件 
> 发件人
>
> Chenzhiyuan(HR)
>
> 发送日期
>
> 2021年07月09日 08:59
>
> 收件人
>
> user-zh@flink.apache.org user-zh@flink.apache.org>
>
> 主题
>
> 如何从复杂的kafka消息体定义 table
>
> 大家好:
> 我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
> 如果json, avro不能满足的话,是不是得自己自定义一个。
> 自定义的话不知道如何写,请各位帮忙指教下。
>
> 定义的表如下:
> CREATE TABLE MyUserTable(
> uuid VARCHAR,
> orgId VARCHAR
> ) with (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'topic_name',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092', '
> connector.properties.group.id' = 'testGroup', 'format.type' = '?'
> )
>
>
> Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
> {
> "beforeData": [],
> "byteSize": 272,
> "columnNumber": 32,
> "data": [{
> "byteSize": 8,
> "columnName": "APPLY_PERSON_ID",
> "rawData": 10017,
> "type": "LONG"
> }, {
> "byteSize": 12,
> "columnName": "UPDATE_SALARY",
> "rawData": "11000.00",
> "type": "DOUBLE"
> }, {
> "byteSize": 11,
> "columnName": "UP_AMOUNT",
> "rawData": "1000.00",
> "type": "DOUBLE"
> }, {
> "byteSize": 3,
> "columnName": "CURRENCY",
> "rawData": "CNY",
> "type": "STRING"
> }, {
> "byteSize": 32,
> "columnName": "EXCHANGE_RATE",
> "rawData": "1.00",
> "type": "DOUBLE"
> },  {
> "byteSize": 11,
> "columnName": "DEDUCTED_ACCOUNT",
> "rawData": "1000.00",
> "type": "DOUBLE"
> }, {
> "byteSize": 1,
> "columnName": "ENTER_AT_PROCESS",
> "rawData": "Y",
> "type": "STRING"
> }],
> "dataCount": 0,
> "dataMetaData": {
> "connector": "mysql",
> "pos": 1000368076,
> "row": 0,
> "ts_ms": 1625565737000,
> "snapshot": "false",
> "db": "testdb",
> "table": "flow_person_t"
> },
> "key": "APPLY_PERSON_ID",
> "memorySize": 1120,
> "operation": "insert",
> "rowIndex": -1,
> "timestamp": "1970-01-01 00:00:00"
> }
>


Re:如何从复杂的kafka消息体定义 table

2021-07-08 Thread 东东
自己实现一个DeserializationFormatFactory就行




可以参考官方的CanalJsonFormatFactory或者DebeziumJsonFormatFactory



在 2021-07-09 08:59:36,"Chenzhiyuan(HR)"  写道:
>大家好:
>我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
>如果json, avro不能满足的话,是不是得自己自定义一个。
>自定义的话不知道如何写,请各位帮忙指教下。
>
>   定义的表如下:
>   CREATE TABLE MyUserTable(
>uuid VARCHAR,
>orgId VARCHAR
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>)
>
>
>Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
>{
>   "beforeData": [],
>"byteSize": 272,
>"columnNumber": 32,
>"data": [{
>"byteSize": 8,
>"columnName": "APPLY_PERSON_ID",
>"rawData": 10017,
>"type": "LONG"
>}, {
>"byteSize": 12,
>"columnName": "UPDATE_SALARY",
>"rawData": "11000.00",
>"type": "DOUBLE"
>}, {
>"byteSize": 11,
>"columnName": "UP_AMOUNT",
>"rawData": "1000.00",
>"type": "DOUBLE"
>}, {
>"byteSize": 3,
>"columnName": "CURRENCY",
>"rawData": "CNY",
>"type": "STRING"
>}, {
>"byteSize": 32,
>"columnName": "EXCHANGE_RATE",
>"rawData": "1.00",
>"type": "DOUBLE"
>},  {
>"byteSize": 11,
>"columnName": "DEDUCTED_ACCOUNT",
>"rawData": "1000.00",
>"type": "DOUBLE"
>}, {
>"byteSize": 1,
>"columnName": "ENTER_AT_PROCESS",
>"rawData": "Y",
>"type": "STRING"
>}],
>"dataCount": 0,
>"dataMetaData": {
>"connector": "mysql",
>"pos": 1000368076,
>"row": 0,
>"ts_ms": 1625565737000,
>"snapshot": "false",
>"db": "testdb",
>"table": "flow_person_t"
>},
>"key": "APPLY_PERSON_ID",
>"memorySize": 1120,
>"operation": "insert",
>"rowIndex": -1,
>"timestamp": "1970-01-01 00:00:00"
>}
>


答复: 回复: 如何从复杂的kafka消息体定义 table

2021-07-08 Thread Chenzhiyuan(HR)
列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。

-邮件原件-
发件人: JasonLee [mailto:17610775...@163.com] 
发送时间: 2021年7月9日 11:34
收件人: user-zh@flink.apache.org
主题: 回复: 如何从复杂的kafka消息体定义 table

Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 
'connector.properties.bootstrap.servers' = 'localhost:9092', 
'connector.properties.group.id' = 'testGroup', 'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) 
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
 回复的原邮件 
发件人

Chenzhiyuan(HR)

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 
'connector.properties.bootstrap.servers' = 'localhost:9092', 
'connector.properties.group.id' = 'testGroup', 'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}


回复: 如何从复杂的kafka消息体定义 table

2021-07-08 Thread JasonLee
Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) 
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
 回复的原邮件 
发件人

Chenzhiyuan(HR)

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}


Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-07-08 Thread Yik San Chan
Hi Piotr,

Yah thanks a lot for your help. For future reference, what I did was simply:

1. Copy the whole BufferingSink as in docs
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
2. In its `invoke` method, I batch write everything in bufferedElements
into Redis using Redis pipelining.

That's pretty much it. Thank you for your help again!


On Thu, Jul 8, 2021 at 11:09 PM Piotr Nowojski  wrote:

> Great, thanks for coming back and I'm glad that it works for you!
>
> Piotrek
>
> czw., 8 lip 2021 o 13:34 Yik San Chan 
> napisał(a):
>
>> Hi Piotr,
>>
>> Thanks! I end up doing option 1, and that works great.
>>
>> Best,
>> Yik San
>>
>> On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> You could always buffer records in your sink function/operator, until a
>>> large enough batch is accumulated and upload the whole batch at once. Note
>>> that if you want to have at-least-once or exactly-once semantics, you would
>>> need to take care of those buffered records in one way or another. For
>>> example you could:
>>> 1. Buffer records on some in memory data structure (not Flink's state),
>>> and just make sure that those records are flushed to the underlying sink on
>>> `CheckpointedFunction#snapshotState()` calls
>>> 2. Buffer records on Flink's state (heap state backend or rocksdb - heap
>>> state backend would be the fastest with little overhead, but you can risk
>>> running out of memory), and that would easily give you exactly-once. That
>>> way your batch could span multiple checkpoints.
>>> 3. Buffer/write records to temporary files, but in that case keep in
>>> mind that those files need to be persisted and recovered in case of failure
>>> and restart.
>>> 4. Ignore checkpointing and either always restart the job from scratch
>>> or accept some occasional data loss.
>>>
>>> FYI, virtually every connector/sink is internally batching writes to
>>> some extent. Usually by doing option 1.
>>>
>>> Piotrek
>>>
>>> wt., 25 maj 2021 o 14:50 Yik San Chan 
>>> napisał(a):
>>>
 Hi community,

 I have a Hive table that stores tens of millions rows of data. In my
 Flink job, I want to process the data in batch manner:

 - Split the data into batches, each batch has (maybe) 10,000 rows.
 - For each batch, call a batchPut() API on my redis client to dump in
 Redis.

 Doing so in a streaming manner is not expected, as that will cause too
 many round trips between Flink workers and Redis.

 Is there a way to do that? I find little clue in Flink docs, since
 almost all APIs feel better suited for streaming processing by default.

 Thank you!

 Best,
 Yik San

>>>


Re: Job Recovery Time on TM Lost

2021-07-08 Thread Gen Luo
@刘建刚
Welcome to join the discuss and thanks for sharing your experience.

I have a minor question. In my experience, network failures in a certain
cluster usually takes a time to recovery, which can be measured as p99 to
guide configuring. So I suppose it would be better to use time than attempt
count as the configuration for confirming TM liveness. How do you think
about this? Or is the premise right according to your experience?

@Lu Niu 
> Does that mean the akka timeout situation we talked above doesn't apply
to flink 1.11?

I suppose it's true. According to the reply from Till in FLINK-23216
, it should be confirmed
that the problem is introduced by declarative resource management, which is
introduced to Flink in 1.12.

In previous versions, although JM still uses heartbeat to check TMs status,
RM will tell JM about TM lost once it is noticed by Yarn. This is much
faster than JM's heartbeat mechanism, if one uses default heartbeat
configurations. However, after 1.12 with declarative resource management,
RM will no longer tell this to JM, since it doesn't have a related
AllocationID.  So the heartbeat mechanism becomes the only way JM can know
about TM lost.

On Fri, Jul 9, 2021 at 6:34 AM Lu Niu  wrote:

> Thanks everyone! This is a great discussion!
>
> 1. Restarting takes 30s when throwing exceptions from application code
> because the restart delay is 30s in config. Before lots of related config
> are 30s which lead to the confusion. I redo the test with config:
>
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
> backoffTimeMS=1000)
> heartbeat.timeout: 50
> akka.ask.timeout 30 s
> akka.lookup.timeout 30 s
> akka.tcp.timeout 30 s
> akka.watch.heartbeat.interval 30 s
> akka.watch.heartbeat.pause 120 s
>
>Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
> restart takes 14s. Does that mean the akka timeout situation we talked
> above doesn't apply to flink 1.11?
>
> 2. About flaky connection between TMs, we did notice sometimes exception
> as follows:
> ```
> TaskFoo switched from RUNNING to FAILED on
> container_e02_1599158147594_156068_01_38 @
> xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
> (dataPort=40957).
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager '
> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
> This might indicate that the remote task manager was lost.
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> at
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331)
> at
> 

flink-sql有没有类似hive里distribute by的功能

2021-07-08 Thread woods
flink-sql有没有类似hive里distribute by的功能,数据行根据某个字段hash到不同的
task中


答复: 回复:如何从复杂的kafka消息体定义 table

2021-07-08 Thread Chenzhiyuan(HR)
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) 
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
 回复的原邮件 
发件人

Chenzhiyuan(HR)

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

  定义的表如下:
  CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
  "beforeData": [],
   "byteSize": 272,
   "columnNumber": 32,
   "data": [{
   "byteSize": 8,
   "columnName": "APPLY_PERSON_ID",
   "rawData": 10017,
   "type": "LONG"
   }, {
   "byteSize": 12,
   "columnName": "UPDATE_SALARY",
   "rawData": "11000.00",
   "type": "DOUBLE"
   }, {
   "byteSize": 11,
   "columnName": "UP_AMOUNT",
   "rawData": "1000.00",
   "type": "DOUBLE"
   }, {
   "byteSize": 3,
   "columnName": "CURRENCY",
   "rawData": "CNY",
   "type": "STRING"
   }, {
   "byteSize": 32,
   "columnName": "EXCHANGE_RATE",
   "rawData": "1.00",
   "type": "DOUBLE"
   },  {
   "byteSize": 11,
   "columnName": "DEDUCTED_ACCOUNT",
   "rawData": "1000.00",
   "type": "DOUBLE"
   }, {
   "byteSize": 1,
   "columnName": "ENTER_AT_PROCESS",
   "rawData": "Y",
   "type": "STRING"
   }],
   "dataCount": 0,
   "dataMetaData": {
   "connector": "mysql",
   "pos": 1000368076,
   "row": 0,
   "ts_ms": 1625565737000,
   "snapshot": "false",
   "db": "testdb",
   "table": "flow_person_t"
   },
   "key": "APPLY_PERSON_ID",
   "memorySize": 1120,
   "operation": "insert",
   "rowIndex": -1,
   "timestamp": "1970-01-01 00:00:00"
}


退订

2021-07-08 Thread Yu Wang
退订


Re: Flink cep checkpoint size

2021-07-08 Thread Li Jim
Hi, Dawid.
Thanks for replying, happy to know you are working on this.

On 2021/07/08 12:14:21, Dawid Wysakowicz  wrote: 
> Hi,
> 
> Sorry for the late reply.
> 
> Indeed I found a couple of problems with clearing the state for short
> lived keys. I created a JIRA[1] issue to track it and opened a PR (which
> needs test coverage before it can be merged) with fixes for those.
> 
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-23314
> 
> On 06/07/2021 09:11, Li Jim wrote:
> > Hi, Mohit, 
> >
> > Have you figured out any solusions on this problem ?
> >
> > I am now facing the exactly same problem ,
> >
> > I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but 
> > the checkpoint size is still growing.
> >
> > On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
> >> Hi,
> >>
> >> I am facing an issue with cep operator where checkpoint size keeps 
> >> increasing even though the pattern is fully matched. I have a stream with 
> >> unique user id and I want to detect a pattern of product purchased by user.
> >>
> >> here is the sample stream data
> >>
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product1","bids":3,"ts":"1622644781243"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":6,"ts":"1622644781245"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":4,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":2,"ts":"1622644781247"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product2","bids":1,"ts":"1622644781248"}
> >> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
> >> ","product":"product3","bids":1,"ts":"1622644781248"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product1","bids":3,"ts":"1622644782235"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":6,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":4,"ts":"1622644782236"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":2,"ts":"1622644782237"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product2","bids":1,"ts":"1622644782238"}
> >> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
> >> ","product":"product3","bids":1,"ts":"1622644782239"}
> >> …..
> >> …..
> >>
> >> StreamExecutionEnvironment env = 
> >> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> env.setParallelism(1);
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "localhost:9092");
> >> properties.setProperty("group.id", "cep");
> >> DataStream stream = env.addSource(
> >> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
> >> properties))
> >> .map(json -> gson.fromJson(json, orders.class))
> >> .assignTimestampsAndWatermarks(
> >> 
> >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
> >> .withTimestampAssigner((orders, timestamp) ->  
> >> orders.ts)
> >> );Pattern pattern = Pattern.begin(
> >> "start",
> >> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
> >> SimpleCondition() {
> >> @Override
> >> public boolean filter(orders value) throws Exception {
> >> return value.product.equals("product1");
> >> }
> >> }).times(1).followedBy("middle").where(new SimpleCondition() {
> >> @Override
> >> public boolean filter(orders value) throws Exception {
> >> return value.product.equals("product2");
> >> }
> >> }).oneOrMore().until(new SimpleCondition() {
> >> @Override
> >> public boolean filter(orders value) throws Exception {
> >> return value.product.equals("product3");
> >> }
> >> }).within(Time.seconds(10));
> >> PatternStream patternStream =
> >> CEP.pattern(stream.keyBy((KeySelector) orders 
> >> -> orders.user_id), pattern);DataStream alerts = 
> >> patternStream.select((PatternSelectFunction) matches ->
> >> matches.get("start").get(0).user_id + "->" +
> >> matches.get("middle").get(0).ts);
> >> alerts.print();
> >>
> >>
> >> [cid:image001.png@01D7579C.775FCA00]
> >>
> >> I have also attached the checkpoint file.
> >>
> >> It looks like the NFA state keeps track of all keys seen and the start 
> >> state and that leads to increase in checkpoint size if the keys are not 
> >> reused in patterns. So, if I have fixed number of keys the size do not 
> >> increase. is this the expected behavior and correct understanding?
> >> Is there a way to drop these keys once the pattern is matched.? or am I 
> >> missing 

Apache Flink - Reading Avro messages from Kafka with schema in schema registry

2021-07-08 Thread M Singh
Hi:
I am trying to read avro encoded messages from Kafka with schema registered in 
schema registry.
I am using the class 
(https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html)
 using the method:

| static ConfluentRegistryAvroDeserializationSchema
forGeneric(...) | 
 |


The arguments for this method are:
forGeneric(org.apache.avro.Schema schema, String url)Creates 
ConfluentRegistryAvroDeserializationSchema that produces GenericRecord using 
the provided reader schema and looks up the writer schema in the Confluent 
Schema Registry.
As I understand, the schema is the reader schema and the url is the schema 
registry url used to retrieve writer schema.  
I have a few questions:
1. Why do we need the writer schema from the registry ?  Is it to validate that 
the reader schema is same as writer schema ?2. Since the url of the schema 
registry is provided, why do we need to provide the reader schema ? Can the 
schema be retrieved at run time from the avro message metadata dynamically and 
then cache it (as shown in the example snippet from confluent below) ? 
The confluent consumer example 
(https://docs.confluent.io/5.0.0/schema-registry/docs/serializer-formatter.html)
 has the following example snippet where the schema.registry.url is provided to 
the consumer and the message can be converted to generic record using the 
KafkaAvroDeserializer without the need to pass the reader schema.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
"io.confluent.kafka.serializers.KafkaAvroDeserializer");props.put("schema.registry.url",
 "http://localhost:8081;);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "topic1";final Consumer consumer = new 
KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic));
try {  while (true) {    ConsumerRecords records = 
consumer.poll(100);    for (ConsumerRecord record : records) {  
    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), 
record.key(), record.value());    }  }} finally {  consumer.close();}


Please let me know if I have missed anything and there is a way to read avro 
encoded messages from kafka with schema registry without requiring reader 
schema.
Thanks






如何从复杂的kafka消息体定义 table

2021-07-08 Thread Chenzhiyuan(HR)
大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

   定义的表如下:
   CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
   "beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}



Re: Job Recovery Time on TM Lost

2021-07-08 Thread Lu Niu
Thanks everyone! This is a great discussion!

1. Restarting takes 30s when throwing exceptions from application code
because the restart delay is 30s in config. Before lots of related config
are 30s which lead to the confusion. I redo the test with config:

FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
backoffTimeMS=1000)
heartbeat.timeout: 50
akka.ask.timeout 30 s
akka.lookup.timeout 30 s
akka.tcp.timeout 30 s
akka.watch.heartbeat.interval 30 s
akka.watch.heartbeat.pause 120 s

   Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
restart takes 14s. Does that mean the akka timeout situation we talked
above doesn't apply to flink 1.11?

2. About flaky connection between TMs, we did notice sometimes exception as
follows:
```
TaskFoo switched from RUNNING to FAILED on
container_e02_1599158147594_156068_01_38 @
xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
(dataPort=40957).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager '
xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
This might indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
```
1. It's a bit inconvenient to debug such an exception because it doesn't
report the exact container id. Right now we have to look for `
xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539`
in JobMananger log to find that.
2. The task manager log doesn't show anything suspicious. Also, no major
GC. So it might imply a flack connection in this case.
3. Is there any short term workaround we can try? any config tuning? Also,
what's the long term solution?

Best
Lu




On Tue, Jul 6, 2021 at 11:45 PM 刘建刚  wrote:

> It is really helpful to find the lost container quickly. In our inner
> flink version, we optimize it by task's report and jobmaster's probe. When
> a task fails because of the connection, it reports to the jobmaster. The
> jobmaster will try to confirm the liveness of the unconnected
> taskmanager for certain times by config. If the jobmaster find the
> taskmanager unconnected or dead, it releases the taskmanger. This will work
> for most cases. For an unstable environment, config needs adjustment.
>
> Gen Luo  于2021年7月6日周二 下午8:41写道:
>
>> Yes, I have noticed the PR and commented there with some consideration
>> about the new option. 

Error when trying to setup and run wordcount example on dataproc

2021-07-08 Thread Joey Tran
Hi!

I'm trying to just get my bearings with dataproc and flink/beam. When
trying to run the wordcount example

with
a long-running YARN sessions
. The
error looks like:

Traceback (most recent call last):
  File "wordcount.py", line 99, in 
run()
  File "wordcount.py", line 94, in run
output | 'Write' >> WriteToText(known_args.output)
  File
"/home/jtran/.local/lib/python3.8/site-packages/apache_beam/pipeline.py",
line 586, in __exit__
self.result.wait_until_finish()
  File
"/home/jtran/.local/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py",
line 599, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-jtran-0708141941-5fcff870_effe0238-2afb-4707-a981-133bc46618fd
failed in state FAILED: java.util.ServiceConfigurationError:
com.fasterxml.jackson.databind.Module: Provider
com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype

flink version 1.12, dataproc image 2.0, beam version 2.30


I found some mention of this when trying to run the example with EMR but no
mention with dataproc (https://issues.apache.org/jira/browse/BEAM-10430).
The workaround is unclear to me, is there something wrong I'm doing with
setting this up? Any advice would be greatly appreciated. Thanks in advance!


Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-07-08 Thread Piotr Nowojski
Great, thanks for coming back and I'm glad that it works for you!

Piotrek

czw., 8 lip 2021 o 13:34 Yik San Chan 
napisał(a):

> Hi Piotr,
>
> Thanks! I end up doing option 1, and that works great.
>
> Best,
> Yik San
>
> On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> You could always buffer records in your sink function/operator, until a
>> large enough batch is accumulated and upload the whole batch at once. Note
>> that if you want to have at-least-once or exactly-once semantics, you would
>> need to take care of those buffered records in one way or another. For
>> example you could:
>> 1. Buffer records on some in memory data structure (not Flink's state),
>> and just make sure that those records are flushed to the underlying sink on
>> `CheckpointedFunction#snapshotState()` calls
>> 2. Buffer records on Flink's state (heap state backend or rocksdb - heap
>> state backend would be the fastest with little overhead, but you can risk
>> running out of memory), and that would easily give you exactly-once. That
>> way your batch could span multiple checkpoints.
>> 3. Buffer/write records to temporary files, but in that case keep in mind
>> that those files need to be persisted and recovered in case of failure and
>> restart.
>> 4. Ignore checkpointing and either always restart the job from scratch or
>> accept some occasional data loss.
>>
>> FYI, virtually every connector/sink is internally batching writes to some
>> extent. Usually by doing option 1.
>>
>> Piotrek
>>
>> wt., 25 maj 2021 o 14:50 Yik San Chan 
>> napisał(a):
>>
>>> Hi community,
>>>
>>> I have a Hive table that stores tens of millions rows of data. In my
>>> Flink job, I want to process the data in batch manner:
>>>
>>> - Split the data into batches, each batch has (maybe) 10,000 rows.
>>> - For each batch, call a batchPut() API on my redis client to dump in
>>> Redis.
>>>
>>> Doing so in a streaming manner is not expected, as that will cause too
>>> many round trips between Flink workers and Redis.
>>>
>>> Is there a way to do that? I find little clue in Flink docs, since
>>> almost all APIs feel better suited for streaming processing by default.
>>>
>>> Thank you!
>>>
>>> Best,
>>> Yik San
>>>
>>


Re: My batch source doesn't emit MAX_WATERMARK when it finishes - why?

2021-07-08 Thread Dawid Wysakowicz
Hi,

Your example does not show what watermarks are flowing through the
program. It prints the watermark at the point a record is being emitted.
As the cited text states, the final watermark is emitted after all
records are emitted. You can test it e.g. with the newly added
writeWatermark method in 1.14 or by implementing a ProcessFunction with
a timer for Long.MAX_VALUE, or lastly with a custom operator.

Best,

Dawid

On 08/07/2021 14:51, Yik San Chan wrote:
> Hi,
>
> According to the docs [1]
>
> When a source reaches the end of the input, it emits a final watermark
> with timestamp Long.MAX_VALUE, indicating the "end of time".
>
>
> However, in my small experiment [2], the Flink job reads from a local
> csv file, and prints a watermark for each record in the SinkFunction
> `invoke` method. Even though I expect the last record comes with a
> MAX_VALUE watermark, all records actually come with a MIN_VALUE watermark.
>
>
> ```
>
> watermark=-9223372036854775808
> watermark=-9223372036854775808
> 1
> 10
>
> ```
>
>
> I wonder what I miss? Is there a way to make sure the source generates
> a MAX_VALUE watermark after it finishes all records?
>
> Thank you!
>
> [1] 
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/eventtime/Watermark.java#L39
> 
>
> [2] https://github.com/YikSanChan/flink-batch-source-watermark
> 


OpenPGP_signature
Description: OpenPGP digital signature


Re: Lot of java.util.zip.ZipFile$Source in JobManager's Heap

2021-07-08 Thread Chesnay Schepler
Those are normal and point to the JDK and Flink classes which remain 
loaded for the lifetime of the JVM.


On 08/07/2021 14:31, Pranjul Ahuja wrote:

Hi,

On analyzing the heap dump of the JobManager process, I am seeing a lot of 
instances of java.util.zip.ZipFile$Source which includes open file handles to 
jar files. These instances are never garbage collected. I am also observing 
that the Resident Memory of the process never goes down. It could be because of 
the presence of so many instances of ZipFile$Source as well but I am not sure. 
We are not using RocksDb. Has anyone seen this issue or is this expected?

Flink Version: 1.13.1
JDK Version: OpenJDK version "11.0.11"

Thanks,
Pranjul





My batch source doesn't emit MAX_WATERMARK when it finishes - why?

2021-07-08 Thread Yik San Chan
Hi,

According to the docs [1]

When a source reaches the end of the input, it emits a final watermark with
timestamp Long.MAX_VALUE, indicating the "end of time".


However, in my small experiment [2], the Flink job reads from a local csv
file, and prints a watermark for each record in the SinkFunction `invoke`
method. Even though I expect the last record comes with a MAX_VALUE
watermark, all records actually come with a MIN_VALUE watermark.


```

watermark=-9223372036854775808
watermark=-9223372036854775808
1
10

```

I wonder what I miss? Is there a way to make sure the source generates a
MAX_VALUE watermark after it finishes all records?

Thank you!

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/eventtime/Watermark.java#L39

[2] https://github.com/YikSanChan/flink-batch-source-watermark


Lot of java.util.zip.ZipFile$Source in JobManager's Heap

2021-07-08 Thread Pranjul Ahuja
Hi,

On analyzing the heap dump of the JobManager process, I am seeing a lot of 
instances of java.util.zip.ZipFile$Source which includes open file handles to 
jar files. These instances are never garbage collected. I am also observing 
that the Resident Memory of the process never goes down. It could be because of 
the presence of so many instances of ZipFile$Source as well but I am not sure. 
We are not using RocksDb. Has anyone seen this issue or is this expected?

Flink Version: 1.13.1
JDK Version: OpenJDK version "11.0.11"

Thanks,
Pranjul


Re: Flink cep checkpoint size

2021-07-08 Thread Dawid Wysakowicz
Hi,

Sorry for the late reply.

Indeed I found a couple of problems with clearing the state for short
lived keys. I created a JIRA[1] issue to track it and opened a PR (which
needs test coverage before it can be merged) with fixes for those.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-23314

On 06/07/2021 09:11, Li Jim wrote:
> Hi, Mohit, 
>
> Have you figured out any solusions on this problem ?
>
> I am now facing the exactly same problem ,
>
> I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the 
> checkpoint size is still growing.
>
> On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
>> Hi,
>>
>> I am facing an issue with cep operator where checkpoint size keeps 
>> increasing even though the pattern is fully matched. I have a stream with 
>> unique user id and I want to detect a pattern of product purchased by user.
>>
>> here is the sample stream data
>>
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product1","bids":3,"ts":"1622644781243"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":6,"ts":"1622644781245"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":4,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":2,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":1,"ts":"1622644781248"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product3","bids":1,"ts":"1622644781248"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product1","bids":3,"ts":"1622644782235"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":6,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":4,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":2,"ts":"1622644782237"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":1,"ts":"1622644782238"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product3","bids":1,"ts":"1622644782239"}
>> …..
>> …..
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setParallelism(1);
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("group.id", "cep");
>> DataStream stream = env.addSource(
>> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
>> properties))
>> .map(json -> gson.fromJson(json, orders.class))
>> .assignTimestampsAndWatermarks(
>> 
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
>> .withTimestampAssigner((orders, timestamp) ->  
>> orders.ts)
>> );Pattern pattern = Pattern.begin(
>> "start",
>> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
>> SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product1");
>> }
>> }).times(1).followedBy("middle").where(new SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product2");
>> }
>> }).oneOrMore().until(new SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product3");
>> }
>> }).within(Time.seconds(10));
>> PatternStream patternStream =
>> CEP.pattern(stream.keyBy((KeySelector) orders -> 
>> orders.user_id), pattern);DataStream alerts = 
>> patternStream.select((PatternSelectFunction) matches ->
>> matches.get("start").get(0).user_id + "->" +
>> matches.get("middle").get(0).ts);
>> alerts.print();
>>
>>
>> [cid:image001.png@01D7579C.775FCA00]
>>
>> I have also attached the checkpoint file.
>>
>> It looks like the NFA state keeps track of all keys seen and the start state 
>> and that leads to increase in checkpoint size if the keys are not reused in 
>> patterns. So, if I have fixed number of keys the size do not increase. is 
>> this the expected behavior and correct understanding?
>> Is there a way to drop these keys once the pattern is matched.? or am I 
>> missing something here?
>>
>> Thanks,
>> Mohit
>>



OpenPGP_signature
Description: OpenPGP digital signature


How to register custormize serializer for flink kafka format type

2021-07-08 Thread Chenzhiyuan(HR)
I create table as below, and the data is from kafka.
I want to deserialize the json message to Pojo object.
But the message format is not avro or simple json.
So I need to know how to register custormized serializer and use it for the 
'format.type' property.
By the way, my flink version is 1.10.0.
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = 'cutormizeSerializer'
)
The kafka message body sample, each columnName is the key for Pojo object, and 
rawData is value:
{
   "beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}
The Pojo object as below:
import lombok.Data;

@Data
public class HrSalaryPersonVO {
private String uuid;
private String orgId;
private String unitId;
private String effectiveDate;

private int adjustPersonCount;

private Double adjustAmount;

private Double beforeSalaryAmount;
private Double adjustRate;

private String data0prateType;

private String status;
}



Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-07-08 Thread Yik San Chan
Hi Piotr,

Thanks! I end up doing option 1, and that works great.

Best,
Yik San

On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski 
wrote:

> Hi,
>
> You could always buffer records in your sink function/operator, until a
> large enough batch is accumulated and upload the whole batch at once. Note
> that if you want to have at-least-once or exactly-once semantics, you would
> need to take care of those buffered records in one way or another. For
> example you could:
> 1. Buffer records on some in memory data structure (not Flink's state),
> and just make sure that those records are flushed to the underlying sink on
> `CheckpointedFunction#snapshotState()` calls
> 2. Buffer records on Flink's state (heap state backend or rocksdb - heap
> state backend would be the fastest with little overhead, but you can risk
> running out of memory), and that would easily give you exactly-once. That
> way your batch could span multiple checkpoints.
> 3. Buffer/write records to temporary files, but in that case keep in mind
> that those files need to be persisted and recovered in case of failure and
> restart.
> 4. Ignore checkpointing and either always restart the job from scratch or
> accept some occasional data loss.
>
> FYI, virtually every connector/sink is internally batching writes to some
> extent. Usually by doing option 1.
>
> Piotrek
>
> wt., 25 maj 2021 o 14:50 Yik San Chan 
> napisał(a):
>
>> Hi community,
>>
>> I have a Hive table that stores tens of millions rows of data. In my
>> Flink job, I want to process the data in batch manner:
>>
>> - Split the data into batches, each batch has (maybe) 10,000 rows.
>> - For each batch, call a batchPut() API on my redis client to dump in
>> Redis.
>>
>> Doing so in a streaming manner is not expected, as that will cause too
>> many round trips between Flink workers and Redis.
>>
>> Is there a way to do that? I find little clue in Flink docs, since almost
>> all APIs feel better suited for streaming processing by default.
>>
>> Thank you!
>>
>> Best,
>> Yik San
>>
>


Re: Apache Flink - How to use/invoke LookupTableSource/Function

2021-07-08 Thread M Singh
 Thanks Jink for such a great explanation and references.
  I will follow-up your references to understand the concepts you have explain 
so well.
Mans

On Wednesday, July 7, 2021, 11:21:39 PM EDT, JING ZHANG 
 wrote:  
 
 Hi Mans,Before coming to the next part, we may need some backgrounds about 
lookup join and temporal join.1. Lookup join is typically used to enrich a 
table with data that is queried from an external system. It requires right 
table to be backed by a lookup source connector.Its syntax is same with 
processing time temporal join [1].2. Temporal joins take an arbitrary table and 
correlate each row to the corresponding row’s relevant version in the versioned 
table (right table). It could joined with right side based on processing time 
and event time.

The difference between Lookup join with processing time temporal join is 
that:Processing time temporal join does not require right table backed by a 
lookup source, that means external system of right table does not need have 
ability to materialize the table as a dynamic table, processing time temporal 
join operator would maintain the most recent version of dimension table.however 
Lookup join requires right table backed by a lookup source, that means external 
system of right table should have ability to maintain the latest snapshot of 
dynamic table itself.
Now let's see the question list.> 1.1. Does this mean that this only works with 
proc time ?
Yes, lookup join could only works with proc time. However, temporal join[1] 
could use both proc time and event time.> 1.2. Is there a way to deal with 
event time for orders and if so how ?
 You could use event time temporal join [1] to deal with event time case.
> 2.1 What is the purpose of use proc_time from the order side ?  Is it to only 
> limit the lasts order record for the lookup or is it to restrict the customer 
> record ?  Does this mean that flink tracks proc time for the customer table ?
Proc_time ensures that each row of the Orders table is joined with those 
Customers rows that match the join predicate at the point in time when the 
Orders row is processed by the lookup join operator.
In lookup join, external storage should provide ability the maintain the latest 
snapshot of dimension table.In processing time temporal join, join operator 
would maintain the most recent version of dimension table.
> 2.2 Since the customer table does not have time attribute, how does Flink 
> keep track of change to customer table to make sure that it joins the order 
> rows only with customer row matching the order with the appropriate proc time 
> ?Same with 2.1
> 2.3. How does flink make sure that the join results is not updated if 
> customer record is updated ?Both lookup join and processing time temporal 
> join, dimension table updates would not effect the result of join because 
> they would not trigger join logical.
>  2.4 If we run the same application twice - with customer record changing in 
>between the two runs, since the orders table has proc time and customer record 
>does not show any time  attribute, will the results of join differ - since the 
>customer record has changed during the two runs ?Yes.
Best,JING ZHANG
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#event-time-temporal-join
M Singh  于2021年7月7日周三 下午5:22写道:

 Hi Jing:
Thanks for your explanation and references.  

I looked at your reference 
(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join)
 and have a few question regarding the example code:
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

The explanation mentions:
A lookup join is typically used to enrich a table with data that is 
queried from an external system. The join requires one table to have a 
processing time attribute and the other table to be backed by a lookup source 
connector.
Questions:1. Does this mean that this only works with proc time ? 
2. Is there a way to deal with event time for orders and if so how ?



The FOR SYSTEM_TIME AS OF clause with the subsequent processing time attribute 
ensures that each row of the Orders table is joined with those Customers rows 
that match the join predicate at the point in time when the Orders row is 
processed by the join operator. It also prevents that the join result is 
updated when a joined Customer row is updated in the future. 


In the above code fragment - the customer table does not have time attribute, 
only the order table has proc_time attribute.  So:
1. What is the purpose of use proc_time from the order side ?  Is it to only 
limit 

Re: Flink Metric Reporting from Job Manager

2021-07-08 Thread Dawid Wysakowicz
Hi,

I think that is not directly supported. After all, the main method can
also be executed outside of a JobManager and there you don't have any
Flink context/connections/components set up.

Best,

Dawid

On 08/07/2021 00:12, Mason Chen wrote:
> Hi all,
>
> Does Flink support reporting metrics from the main method that is ran on the 
> Job Manager? In this case, we want to report a failure to add an operator to 
> the Job Graph.
>
> Best,
> Mason



OpenPGP_signature
Description: OpenPGP digital signature


Re: Question about POJO rules - why fields should be public or have public setter/getter?

2021-07-08 Thread Dawid Wysakowicz
Hi Naehee,

Short answer would be for historic reasons and compatibility reasons. It
was implemented that way back in the days and we don't want to change
the default type extraction logic. Otherwise user jobs that rely on the
default type extraction logic for state storing would end up with a
state stored in an incompatible way with the updated serializer.

This is not a problem for Table/SQL programs as we control the state
internally, and that's why we were able to change the requirements for
POJOs in Table/SQL programs. [1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types

On 08/07/2021 00:09, Naehee Kim wrote:
> According to the Flink doc, 
>
> Flink recognizes a data type as a POJO type (and allows “by-name”
> field referencing) if the following conditions are fulfilled:
>
>   * The class is public and standalone (no non-static inner class)
>   * The class has a public no-argument constructor
>   * All non-static, non-transient fields in the class (and all
> superclasses) are either public (and non-final) or have a public
> getter- and a setter- method that follows the Java beans naming
> conventions for getters and setters.
>
>
> PojoSerializer uses Java reflection to access an object's fields. One
> of PojoSerializer's constructor calls setAccessible(true) for all fields.
> for (int i = 0; i < numFields; i++) {
>    this.fields[i].setAccessible(true);
> }
> Then, to my knowledge, it can set a field regardless of the field's
> access control(private, public,..).
>
> However, its another constructor, called by PojoSerializerSnapshot,
> doesn't call setAccessible(true). Does anyone know the reason why
> setAccessible(true) is not called here? And why fields should be
> public or have a public gettter- and setter- method?
>
> Regards,
> Naehee
>
>


OpenPGP_signature
Description: OpenPGP digital signature


??????????????????????

2021-07-08 Thread hbdrawn
flink 
newer


1.5??gelly??
??4128G50??core


flink standlone cluster


tm??slots 
12??100Gslots~=9G12*4=48??tm


tm??slots 50??100Gslots~=2G, 
48TM




??

Re: More detail information in sql validate exception

2021-07-08 Thread JING ZHANG
Hi Houyin,
Welcome to contribute to the community!
Before coding,  we need create a JIRA ticket or mailing list discussion and
reach consensus. You could.describe the background in detail and give the
proposal if you already have one.
More information could be found in the document[1].
[1] https://flink.apache.org/contributing/contribute-code.html

Best,
JING ZHANG

纳兰清风  于2021年7月8日周四 下午12:00写道:

> Hi,
> Currently, When I was using a lot of the same udf in a sql, I can't locate
> where the semantic occor if some udf being used in a wrong way. So I try to
> change some code in flink-table-common and flink-table-runtime-blink to
> extract more detail information such as position and sql context in the
> exception creating function.
> The flink version is 1.13
>
> How can I feedback this feature to communicaty ?
>
>
>
>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter,

The JIRA is https://issues.apache.org/jira/browse/FLINK-23309. `bundle
time` is from the perspective of your e2e latency. Regarding the `bundle
size`, generally larger value will provide better throughput, but it should
not be set too large, which may cause no output to be seen downstream for a
long time and the pressure will be too great during checkpoint.

Best,
Xingbo

Wouter Zorgdrager  于2021年7月8日周四 下午4:32写道:

> Hi Xingbo, all,
>
> That is good to know, thank you. Is there any Jira issue I can track? I'm
> curious to follow this progress! Do you have any recommendations with
> regard to these two configuration values, to get somewhat reasonable
> performance?
>
> Thanks a lot!
> Wouter
>
> On Thu, 8 Jul 2021 at 10:26, Xingbo Huang  wrote:
>
>> Hi Wouter,
>>
>> In fact, our users have encountered the same problem. Whenever the
>> `bundle size` or `bundle time` is reached, the data in the buffer needs to
>> be sent from the jvm to the pvm, and then waits for the pym to be processed
>> and sent back to the jvm to send all the results to the downstream
>> operator, which leads to a large delay, especially when it is a small size
>> event as small messages are hard to be processed in pipeline.
>>
>> I have been solving this problem recently and I plan to make this
>> optimization to release-1.14.
>>
>> Best,
>> Xingbo
>>
>> Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:
>>
>>> Hi Dian, all,
>>>
>>>  I will come back to the other points asap. However, I’m still confused
>>> about this performance. Is this what I can expect in PyFlink in terms of
>>> performance? ~ 1000ms latency for single events? I also had a very simple
>>> setup where I send 1000 events to Kafka per second and response
>>> times/latencies was around 15 seconds for single events. I understand there
>>> is some Python/JVM overhead but since Flink is so performant, I would
>>> expect much better numbers. In the current situation, PyFlink would just be
>>> unusable if you care about latency. Is this something that you expect to be
>>> improved in the future?
>>>
>>> I will verify how this works out for Beam in a remote environment.
>>>
>>> Thanks again!
>>> Wouter
>>>
>>>
>>> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>>>
 Hi Wouter,

 1) Regarding the performance difference between Beam and PyFlink, I
 guess it’s because you are using an in-memory runner when running it
 locally in Beam. In that case, the code path is totally differently
 compared to running in a remote cluster.
 2) Regarding to `flink run`, I’m surprising that it’s running locally.
 Could you submit a java job with similar commands to see how it runs?
 3) Regarding to `flink run-application`, could you share the exception
 stack?

 Regards,
 Dian

 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:

 uses





Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Xingbo, all,

That is good to know, thank you. Is there any Jira issue I can track? I'm
curious to follow this progress! Do you have any recommendations with
regard to these two configuration values, to get somewhat reasonable
performance?

Thanks a lot!
Wouter

On Thu, 8 Jul 2021 at 10:26, Xingbo Huang  wrote:

> Hi Wouter,
>
> In fact, our users have encountered the same problem. Whenever the `bundle
> size` or `bundle time` is reached, the data in the buffer needs to be sent
> from the jvm to the pvm, and then waits for the pym to be processed and
> sent back to the jvm to send all the results to the downstream operator,
> which leads to a large delay, especially when it is a small size event as
> small messages are hard to be processed in pipeline.
>
> I have been solving this problem recently and I plan to make this
> optimization to release-1.14.
>
> Best,
> Xingbo
>
> Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:
>
>> Hi Dian, all,
>>
>>  I will come back to the other points asap. However, I’m still confused
>> about this performance. Is this what I can expect in PyFlink in terms of
>> performance? ~ 1000ms latency for single events? I also had a very simple
>> setup where I send 1000 events to Kafka per second and response
>> times/latencies was around 15 seconds for single events. I understand there
>> is some Python/JVM overhead but since Flink is so performant, I would
>> expect much better numbers. In the current situation, PyFlink would just be
>> unusable if you care about latency. Is this something that you expect to be
>> improved in the future?
>>
>> I will verify how this works out for Beam in a remote environment.
>>
>> Thanks again!
>> Wouter
>>
>>
>> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>>
>>> Hi Wouter,
>>>
>>> 1) Regarding the performance difference between Beam and PyFlink, I
>>> guess it’s because you are using an in-memory runner when running it
>>> locally in Beam. In that case, the code path is totally differently
>>> compared to running in a remote cluster.
>>> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
>>> Could you submit a java job with similar commands to see how it runs?
>>> 3) Regarding to `flink run-application`, could you share the exception
>>> stack?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>>>
>>> uses
>>>
>>>
>>>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter,

In fact, our users have encountered the same problem. Whenever the `bundle
size` or `bundle time` is reached, the data in the buffer needs to be sent
from the jvm to the pvm, and then waits for the pym to be processed and
sent back to the jvm to send all the results to the downstream operator,
which leads to a large delay, especially when it is a small size event as
small messages are hard to be processed in pipeline.

I have been solving this problem recently and I plan to make this
optimization to release-1.14.

Best,
Xingbo

Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:

> Hi Dian, all,
>
>  I will come back to the other points asap. However, I’m still confused
> about this performance. Is this what I can expect in PyFlink in terms of
> performance? ~ 1000ms latency for single events? I also had a very simple
> setup where I send 1000 events to Kafka per second and response
> times/latencies was around 15 seconds for single events. I understand there
> is some Python/JVM overhead but since Flink is so performant, I would
> expect much better numbers. In the current situation, PyFlink would just be
> unusable if you care about latency. Is this something that you expect to be
> improved in the future?
>
> I will verify how this works out for Beam in a remote environment.
>
> Thanks again!
> Wouter
>
>
> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>
>> Hi Wouter,
>>
>> 1) Regarding the performance difference between Beam and PyFlink, I guess
>> it’s because you are using an in-memory runner when running it locally in
>> Beam. In that case, the code path is totally differently compared to
>> running in a remote cluster.
>> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
>> Could you submit a java job with similar commands to see how it runs?
>> 3) Regarding to `flink run-application`, could you share the exception
>> stack?
>>
>> Regards,
>> Dian
>>
>> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>>
>> uses
>>
>>
>>


flink-sql 连接kafka报错

2021-07-08 Thread yanyunpeng
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V

flink-sql 查询kafka

kafka版本2.4 connector版本flink-sql-connector-kafka_2.11-1.11.2.jar


请求 这是什么原因 是 connector的版本问题有?

Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Dian, all,

 I will come back to the other points asap. However, I’m still confused
about this performance. Is this what I can expect in PyFlink in terms of
performance? ~ 1000ms latency for single events? I also had a very simple
setup where I send 1000 events to Kafka per second and response
times/latencies was around 15 seconds for single events. I understand there
is some Python/JVM overhead but since Flink is so performant, I would
expect much better numbers. In the current situation, PyFlink would just be
unusable if you care about latency. Is this something that you expect to be
improved in the future?

I will verify how this works out for Beam in a remote environment.

Thanks again!
Wouter


On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:

> Hi Wouter,
>
> 1) Regarding the performance difference between Beam and PyFlink, I guess
> it’s because you are using an in-memory runner when running it locally in
> Beam. In that case, the code path is totally differently compared to
> running in a remote cluster.
> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
> Could you submit a java job with similar commands to see how it runs?
> 3) Regarding to `flink run-application`, could you share the exception
> stack?
>
> Regards,
> Dian
>
> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>
> uses
>
>
>


RE: State Processor API and existing state

2021-07-08 Thread Tan, Min
Many thanks for your prompt reply.

Yes. They are the same operators.
What I did is just modifying the content of POJO .e.g., doubling amount fields.

I am not able to send the production code, but I will do a separate mocked 
project to reproduce the issue. Send you the mocked code later.

Regards,
Min

From: JING ZHANG 
Sent: 08 July 2021 04:45
To: Tan, Min 
Cc: Marco Villalobos ; user 
Subject: [External] Re: State Processor API and existing state

Hi min,
Is the POJO state in an existed operator or a new added operator?
BTW, that would be great if you would like to give the code to reproduce the 
exception. I need more debug to find out the reason based on the code.


Tan, Min mailto:min@ubs.com>> 于2021年7月8日周四 上午2:56写道:
Hi,

I have followed the steps below in restarting a Flink job with newly modified 
savepoints.

I can re start a job with new savepoints as long as the Flink states are 
expressed in Java primitives.
When the flink states are expressed in a POJO, my job does not get restarted. I 
have the following exceptions.

Any ideas? Do I need to redefine any serializers?

Thank you very much for your help in advance.
Regards,

Min


-Flink
 exceptions---
2021-07-07 19:02:58
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) 
from any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at 

Re: PyFlink performance and deployment issues

2021-07-08 Thread Dian Fu
Hi Wouter,

1) Regarding the performance difference between Beam and PyFlink, I guess it’s 
because you are using an in-memory runner when running it locally in Beam. In 
that case, the code path is totally differently compared to running in a remote 
cluster.
2) Regarding to `flink run`, I’m surprising that it’s running locally. Could 
you submit a java job with similar commands to see how it runs?
3) Regarding to `flink run-application`, could you share the exception stack?

Regards,
Dian

> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
> 
> uses



Re: Savepoints with bootstraping a datastream function

2021-07-08 Thread Rakshit Ramesh
Yes! I was only worried about the jobid changing and the checkpoint being
un-referenceable.
But since I can pass a path to the checkpoint that will not be an issue.


Thanks a lot for your suggestions!

On Thu, 8 Jul 2021 at 11:26, Arvid Heise  wrote:

> Hi Rakshit,
>
> It sounds to me as if you don't need the Savepoint API at all. You can
> (re)start all applications with the previous state (be it retained
> checkpoint or savepoint). You just need to provide the path to that in your
> application invocation [1] (every entry point has such a parameter, you
> might need to check the respective documentation if you are not using CLI).
> Note that although it only says savepoint, starting from a checkpoint is
> fine as well (just not recommended in the beginning).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint
>
> On Thu, Jul 8, 2021 at 6:31 AM Rakshit Ramesh <
> rakshit.ram...@datakaveri.org> wrote:
>
>> Sorry for being a little vague there.
>> I want to create a Savepoint from a DataStream right before the job is
>> finished or cancelled.
>> What you have shown in the IT case is how a datastream can be
>> bootstrapped with state that is
>> formed formed by means of DataSet.
>> My jobs are triggered by a scheduler periodically (every day) using the
>> api and I would like
>> to bootstrap each day's job with the state of the previous day.
>>
>> But thanks for the input on the Checkpoint behaviour wrt a FINISHED
>> state,
>> I think that will work for me.
>>
>> Thanks!
>>
>> On Thu, 8 Jul 2021 at 02:03, Arvid Heise  wrote:
>>
>>> I don't quite understand your question. You use Savepoint API to create
>>> a savepoint with a batch job (that's why it's DataSet Transform currently).
>>> That savepoint can only be restored through a datastream application.
>>> Dataset applications cannot start from a savepoint.
>>>
>>> So I don't understand why you see a difference between "restoring a
>>> savepoint to a datastream" and "create a NewSavepoint for a datastream".
>>> It's ultimately the same thing for me. Just to be very clear: the main
>>> purpose of Savepoint API is to create the initial state of a datastream
>>> application.
>>>
>>> For your second question, yes retained checkpoints outlive the job in
>>> all regards. It's the users responsibility to eventually clean that up.
>>>
>>>
>>>
>>> On Wed, Jul 7, 2021 at 6:56 PM Rakshit Ramesh <
>>> rakshit.ram...@datakaveri.org> wrote:
>>>
 Yes I could understand restoring a savepoint to a datastream.
 What I couldn't figure out is to create a NewSavepoint for a datastream.
 What I understand is that NewSavepoints only take in Bootstrap
 transformation for Dataset Transform functions.


 About the checkpoints, does
  CheckpointConfig.ExternalizedCheckpointCleanup =
 RETAIN_ON_CANCELLATION
 offer the same behaviour when the job is "FINISHED" and not "CANCELLED"
 ?

 What I'm looking for is a way to retain the state for a bounded job so
 that the state is reloaded on the next job run (through api).

 On Wed, 7 Jul 2021 at 14:18, Arvid Heise  wrote:

> Hi Rakshit,
>
> The example is valid. The state processor API is kinda working like a
> DataSet application but the state is meant to be read in DataStream. 
> Please
> check out the SavepointWriterITCase [1] for a full example. There is no
> checkpoint/savepoint in DataSet applications.
>
> Checkpoints can be stored on different checkpoint storages, such as S3
> or HDFS. If you use RocksDB state backend, Flink pretty much just copy the
> SST files of RocksDB to S3. Checkpoints are usually bound to the life of 
> an
> application. So they are created by the application and deleted on
> termination.
> However, you can resume an application both from savepoint and
> checkpoints. Checkpoints can be retained [2] to avoid them being deleted 
> by
> the application during termination. But that's considered an advanced
> feature and you should first try it with savepoints.
>
> [1]
> https://github.com/apache/flink/blob/release-1.13.0/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java#L141-L141
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#retained-checkpoints
>
> On Mon, Jul 5, 2021 at 5:56 PM Rakshit Ramesh <
> rakshit.ram...@datakaveri.org> wrote:
>
>> I'm trying to bootstrap state into a KeyedProcessFunction equivalent
>> that takes in
>> a DataStream but I'm unable to find a reference for the same.
>> I found this gist
>> https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>> But it seems to only apply for DataSet.
>> My usecase is to manually trigger a Savepoint into s3 for later reuse.
>> I'm also guessing that