Re:Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 Thread 王炳焱
Hi
  非常感谢您的回复,state-processor-api我也尝试过,SQL生成的job 
graph没办法获取到每个算子的UID,所以state-processor-api也无法获取原来的state信息,没办法操作state,如果有更好的解决方案麻烦再回复一下邮件哈


感谢



















在 2021-05-20 10:46:22,"Yun Tang"  写道:
>Hi
>
>BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 
>state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 
>的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 
>RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data 
>stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。
>
>祝好
>唐云
>
>From: 王炳焱 <15307491...@163.com>
>Sent: Tuesday, May 18, 2021 20:02
>To: user-zh@flink.apache.org 
>Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report 
>an error
>
>我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:
>
>
>2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup 
>[] - The operator name Calc(select=[((CAST((log_info 
>get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
>_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
>_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
>_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
>get_json_object2 _UTF-16LE'status') SEARCH 
>Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
>get_json_object2 _UTF-16LE'data.itemType') SEARCH 
>Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
>(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
>(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
>get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
>characters length limit and was truncated.
>2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup 
>[] - The operator name 
>SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
>fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
>truncated.
>2021-05-14 22:02:44,879 ERROR 
>org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
>Caught unexpected exception.
>java.io.IOException: Could not find class 
>'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
> in classpath.
>at 
>org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)

Re: Guidance for Integration Tests with External Technologies

2021-05-19 Thread Yun Gao
Hi Rion,

Do you mean you are running the tests directly in the IDE like Idea  for 
"multiple tests run in sequence" ?
If the test could be successful when running separately, but would fail when 
running in sequence, then
it seems there other tests should still infect on the failed tests.

For the consume failure, is there errors or the flink just fetchs no data? From 
the description I reckon 
the failure might not related to the flink job structure, if we have 
suspections on this point, could we
change it to a simpler job first to see if the fetched kafka records satisfy 
expection ? 

Best,
Yun



 --Original Mail --
Sender:Rion Williams 
Send Date:Wed May 19 07:14:05 2021
Recipients:user 
Subject:Guidance for Integration Tests with External Technologies
Hey all,

I’ve been taking a very TDD-oriented approach to developing many of the Flink 
apps I’ve worked on, but recently I’ve encountered a problem that has me 
scratching my head.

A majority of my integration tests leverage a few external technologies such as 
Kafka and typically a relational database like Postgres. I’ve found 
in-memory/embedded versions of these that have worked well in the past to allow 
me to:

- send messages into a kafka topic
- run my exact Flink job asynchronously 
- verify my results / assertions in Postgres via awaitility

Recently, I had a use case for Broadcast state for a job and found that my 
tests would run successfully when executed directly but multiple tests run in 
sequence (in the same file), it seems that Flink would fail to consume from the 
topics and eventually fail the assertion. 

I’ve tried several approaches including:
- ensuring that each Flink job is passed a unique consumer.id / group.id / 
application.id
- ensuring each test has brand new Kafka topics specific for it
- spinning up a new Flink cluster / Kafka cluster / Postgres instance per test

I’m not entirely sure what could be causing the problem but it only occurs for 
Flink jobs that read from two topics and leverage broadcast state. All other 
integration tests that use Kafka/Flink/Postgres still pass and can be run in 
sequence.

Any advice / examples / recommendations would be helpful. l’d be happy to 
elaborate and provide code whenever possible as well.

Thanks,

Rion

Re: Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
Hi Marco,

I think Flink does not need 500GB for the source, the source should
be able to read from S3 in a streaming pattern (namely open the file,
create an input stream and fetch data as required). 

But it might indeed need disk spaces for intermediate data
between operators and the sort operator. The amount of space
should depends on how the operators scale up or scale down
the data volume. Another point is that if the parallelism > 1, 
a single machine should be able to require less disk space.

Best,
Yun



 --Original Mail --
Sender:Marco Villalobos 
Send Date:Thu May 20 01:16:39 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Questions Flink DataStream in BATCH execution mode scalability 
advice


> On May 19, 2021, at 7:26 AM, Yun Gao  wrote:
> 
> Hi Marco,
> 
> For the remaining issues, 
> 
> 1. For the aggregation, the 500GB of files are not required to be fit into 
> memory.
> Rough speaking for the keyed().window().reduce(), the input records would be 
> first
> sort according to the key (time_series.name) via external sorts, which only 
> consumes
> a fix amount of memory. Then for all the records of each key, flink would use 
> a special 
> single key statebackend to hold the intermediate result for this key, whose 
> memory
> consumption usually could be ignored. 

Thank you. However, if the source of data is s3, this means that locally the 
machine must have disk space to store the files from s3 and any intermediate 
files, right?

Considering that the source of data is 500 GB, doesn't this mean that the 
machine will need at least 500 GB of disk space?


> 2. The first operator (namely the source) would read the files direclty and 
> emit to the
> following operators directly, thus there should be no intermediate result 
> before the first
> operator. 
> 3. I wonder now flink does not support using S3 to store the intermediate 
> result, since it
> relies on local I/O mechanisms like mmap or local file read/write, and S3 
> seems not
> support. EBS seems to be ok.
> 4. The heartbeat timeout happens normally due to akka thread get blocked or 
> network issue. 
> To check if thread get blocked, you may first check the GC log to see if 
> there are long full gc, 
> if not, then check if the JM or TM akka thread get blocked via thread dump. 
> If it seems to be 
> the network issues, the job could configure heartbeat.timeout [1] to increase 
> the timeout.
> 
> Best,
> Yun
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
> 
> 
> --Original Mail --
> Sender:Marco Villalobos 
> Send Date:Wed May 19 14:03:48 2021
> Recipients:user 
> Subject:Questions Flink DataStream in BATCH execution mode scalability advice
> Questions Flink DataStream in BATCH execution mode scalability advice.
> 
> Here is the problem that I am trying to solve.
> 
> Input is an S3 bucket directory with about 500 GB of data across many files. 
> The instance that I am running on only has 50GB of EBS storage. The nature of 
> this data is time series data. Imagine name, value, timestamp.
> 
> I must average the time_series.value by time_series.name on a tumbling window 
> of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a 
> quarter.  I key by tag name and 15-minute interval.
> 
> After aggregation, I must forward fill the missing quarters for each 
> time_series.name. Currently, this forward fill operator is keyed only by 
> time_series.name. Does this mean that in batch mode, all of the time series 
> with the same time_series.name within the 500 gb of files must fit in memory?
> 
> The results are saved in a rdbms.
> 
> If this job somehow reads all 500 GB before it sends it to the first 
> operator, where is the data store?
> 
> Now considering that the EMR node only has 50GB of ebs (that's disk storage), 
> is there a means to configure Flink to store its intermediate results within 
> S3?
> 
> When the job failed, I saw this exception in the log: "Recovery is suppressed 
> by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?
> 
> My job keeps on failing for the same reason, it says, "The heartbeat of 
> TaskManager with id container_xxx timed out." Is there a way to configure it 
> not to timeout?
> 
> I would appreciate any advice on how I should save these problems. Thank you.
> 
> 

Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 Thread Yun Tang
Hi

BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 
state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 
的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 
RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data 
stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。

祝好
唐云

From: 王炳焱 <15307491...@163.com>
Sent: Tuesday, May 18, 2021 20:02
To: user-zh@flink.apache.org 
Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report 
an error

我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 

退订

2021-05-19 Thread zander0...@163.com
退订



周德虎
 
电话:15021351770
邮箱:zander0...@163.com



Re: flink 1.13.0 ,使用flink sql 链接数据库是否支持多模式,即表名为schema.name

2021-05-19 Thread Shengkai Fang
请问是要用正则表达式匹配数据库中的table吗?‘org.users’ 是一个正则表达式吗?

Best,
Shengkai

Asahi Lee <978466...@qq.com> 于2021年5月19日周三 下午2:01写道:

> hi!
>flink jdbc 是否有考虑支持表基于模式查询?如下 table-name写法:
> CREATE TABLE MyUserTable (   id BIGINT,   name STRING,   age INT,   status
> BOOLEAN,   PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' =
> 'org.users' );


Re: two questions about flink stream processing: kafka sources and TimerService

2021-05-19 Thread Jin Yi
thanks ingo!  i'll look at moving to rolling my own operator and using
ConnectedStreams.transform with it.

On Tue, May 18, 2021 at 3:18 AM Ingo Bürk  wrote:

> Hi Jin,
>
> 1) As far as I know the order is only guaranteed for events from the same
> partition. If you want events across partitions to remain in order you may
> need to use parallelism 1. I'll attach some links here which might be
> useful:
>
>
> https://stackoverflow.com/questions/50340107/order-of-events-with-chained-keyby-calls-on-same-key
>
> https://stackoverflow.com/questions/44156774/ordering-of-records-in-a-keyed-stream-in-flink
>
> https://stackoverflow.com/questions/50573174/flink-kafka-producer-elements-out-of-order
>
> 2) Indeed there doesn't seem to be a way to access the
> InternalTimerService from a ProcessFunction at the moment. One approach
> could be to implement this yourself using a MapState. Otherwise I think you
> need to implement your own operator from which you can then access
> InternalTimerService similar to how KeyedCoProcessOperator does it as well.
>
>
> Regards
> Ingo
>
> On Wed, May 12, 2021 at 8:32 AM Jin Yi  wrote:
>
>> hello.  thanks ahead of time for anyone who answers.
>>
>> 1.  verifying my understanding: for a kafka source that's partitioned on
>> the same piece of data that is later used in a keyBy, if we are relying on
>> the kafka timestamp as the event timestamp, is it guaranteed that the event
>> stream of the source is in the kafka pipeline's insertion order for the
>> topic?
>>
>> 2.  is there a way to use the InternalTimerService from within a
>> ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an
>> easy way to do this, except by changing the TimerService interface.  the
>> use case for my need is that i'd like to have timers to clean up the left
>> and right keyed state using namespaced timers like how IntervalJoin does it
>> (
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).
>> right now, b/c the KeyedCoProcessFunction only gives us the
>> SimpleTimerService via the Context, i can only trigger onTimer execution
>> without being able to refine the cleaning of state to just the event state
>> of the side that a timer was originated from.  without this, it'll end up
>> needing to visit state associated with both event streams which isn't
>> performant as those streams can have different throughputs (and therefore,
>> expect to have different retention characteristics/needs).
>>
>> thanks.
>>
>


Re: Prometheus Reporter Enhancement

2021-05-19 Thread Mason Chen
Are there any plans to rework some of the metric name formulations 
(getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or 
label values are concatenated in the metric name and the information is 
redundant and makes the metric names longer.

Would it make sense to remove the tag related information (getAllVariables())?

> On May 18, 2021, at 3:45 PM, Chesnay Schepler  wrote:
> 
> There is already a ticket for this. Note that this functionality should be 
> implemented in a generic fashion to be usable for all reporters.
> 
> https://issues.apache.org/jira/browse/FLINK-17495 
> 
> 
> On 5/18/2021 8:16 PM, Andrew Otto wrote:
>> Sounds useful!
>> 
>> On Tue, May 18, 2021 at 2:02 PM Mason Chen > > wrote:
>> Hi all,
>> 
>> Would people appreciate enhancements to the prometheus reporter to include 
>> extra labels via a configuration, as a contribution to Flink? I can see it 
>> being useful for adding labels that are not job specific, but infra specific.
>> 
>> The change would be nicely integrated with the Flink’s ConfigOptions and 
>> unit tested.
>> 
>> Best,
>> Mason
> 



Parallelism in Production: Best Practices

2021-05-19 Thread Yaroslav Tkachenko
Hi everyone,

I'd love to learn more about how different companies approach specifying
Flink parallelism. I'm specifically interested in real, production
workloads.

I can see a few common patterns:

- Rely on default parallelism, scale by changing parallelism for the whole
pipeline. I guess it only works if the pipeline doesn't have obvious
bottlenecks. Also, it looks like the new reactive mode makes specifying
parallelism for an operator obsolete (
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/elastic_scaling/#configuration
)

- Rely on default parallelism for most of the operators, but override it
for some. For example, it doesn't make sense for a Kafka source to have
parallelism higher than the number of partitions it consumes. Some custom
sinks could choose lower parallelism to avoid overloading their
destinations. Some transformation steps could choose higher parallelism to
distribute the work better, etc.

- Don't rely on default parallelism and configure parallelism
explicitly for each operator. This requires very good knowledge of each
operator in the pipeline, but it could lead to very good performance.

Is there a different pattern that I miss? What do you use? Feel free to
share any resources.

If you do specify it explicitly, what do you think about the reactive mode?
Will you use it?

Also, how often do you change parallelism? Do you set it once and forget
once the pipeline is stable? Do you keep re-evaluating it?

Thanks.


Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Marco Villalobos



> On May 19, 2021, at 7:26 AM, Yun Gao  wrote:
> 
> Hi Marco,
> 
> For the remaining issues, 
> 
> 1. For the aggregation, the 500GB of files are not required to be fit into 
> memory.
> Rough speaking for the keyed().window().reduce(), the input records would be 
> first
> sort according to the key (time_series.name) via external sorts, which only 
> consumes
> a fix amount of memory. Then for all the records of each key, flink would use 
> a special 
> single key statebackend to hold the intermediate result for this key, whose 
> memory
> consumption usually could be ignored. 

Thank you. However, if the source of data is s3, this means that locally the 
machine must have disk space to store the files from s3 and any intermediate 
files, right?

Considering that the source of data is 500 GB, doesn't this mean that the 
machine will need at least 500 GB of disk space?


> 2. The first operator (namely the source) would read the files direclty and 
> emit to the
> following operators directly, thus there should be no intermediate result 
> before the first
> operator. 
> 3. I wonder now flink does not support using S3 to store the intermediate 
> result, since it
> relies on local I/O mechanisms like mmap or local file read/write, and S3 
> seems not
> support. EBS seems to be ok.
> 4. The heartbeat timeout happens normally due to akka thread get blocked or 
> network issue. 
> To check if thread get blocked, you may first check the GC log to see if 
> there are long full gc, 
> if not, then check if the JM or TM akka thread get blocked via thread dump. 
> If it seems to be 
> the network issues, the job could configure heartbeat.timeout [1] to increase 
> the timeout.
> 
> Best,
> Yun
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
> 
> 
> --Original Mail --
> Sender:Marco Villalobos 
> Send Date:Wed May 19 14:03:48 2021
> Recipients:user 
> Subject:Questions Flink DataStream in BATCH execution mode scalability advice
> Questions Flink DataStream in BATCH execution mode scalability advice.
> 
> Here is the problem that I am trying to solve.
> 
> Input is an S3 bucket directory with about 500 GB of data across many files. 
> The instance that I am running on only has 50GB of EBS storage. The nature of 
> this data is time series data. Imagine name, value, timestamp.
> 
> I must average the time_series.value by time_series.name on a tumbling window 
> of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a 
> quarter.  I key by tag name and 15-minute interval.
> 
> After aggregation, I must forward fill the missing quarters for each 
> time_series.name. Currently, this forward fill operator is keyed only by 
> time_series.name. Does this mean that in batch mode, all of the time series 
> with the same time_series.name within the 500 gb of files must fit in memory?
> 
> The results are saved in a rdbms.
> 
> If this job somehow reads all 500 GB before it sends it to the first 
> operator, where is the data store?
> 
> Now considering that the EMR node only has 50GB of ebs (that's disk storage), 
> is there a means to configure Flink to store its intermediate results within 
> S3?
> 
> When the job failed, I saw this exception in the log: "Recovery is suppressed 
> by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?
> 
> My job keeps on failing for the same reason, it says, "The heartbeat of 
> TaskManager with id container_xxx timed out." Is there a way to configure it 
> not to timeout?
> 
> I would appreciate any advice on how I should save these problems. Thank you.
> 
> 



Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
Hi Marco,

For the remaining issues, 

1. For the aggregation, the 500GB of files are not required to be fit into 
memory.
Rough speaking for the keyed().window().reduce(), the input records would be 
first
sort according to the key (time_series.name) via external sorts, which only 
consumes
a fix amount of memory. Then for all the records of each key, flink would use a 
special 
single key statebackend to hold the intermediate result for this key, whose 
memory
consumption usually could be ignored. 
2. The first operator (namely the source) would read the files direclty and 
emit to the
following operators directly, thus there should be no intermediate result 
before the first
operator. 
3. I wonder now flink does not support using S3 to store the intermediate 
result, since it
relies on local I/O mechanisms like mmap or local file read/write, and S3 seems 
not
support. EBS seems to be ok.
4. The heartbeat timeout happens normally due to akka thread get blocked or 
network issue. 
To check if thread get blocked, you may first check the GC log to see if there 
are long full gc, 
if not, then check if the JM or TM akka thread get blocked via thread dump. If 
it seems to be 
the network issues, the job could configure heartbeat.timeout [1] to increase 
the timeout.

Best,
Yun

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout



 --Original Mail --
Sender:Marco Villalobos 
Send Date:Wed May 19 14:03:48 2021
Recipients:user 
Subject:Questions Flink DataStream in BATCH execution mode scalability advice

Questions Flink DataStream in BATCH execution mode scalability advice.

Here is the problem that I am trying to solve.

Input is an S3 bucket directory with about 500 GB of data across many files. 
The instance that I am running on only has 50GB of EBS storage. The nature of 
this data is time series data. Imagine name, value, timestamp.

I must average the time_series.value by time_series.name on a tumbling window 
of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a 
quarter.  I key by tag name and 15-minute interval.

After aggregation, I must forward fill the missing quarters for each 
time_series.name. Currently, this forward fill operator is keyed only by 
time_series.name. Does this mean that in batch mode, all of the time series 
with the same time_series.name within the 500 gb of files must fit in memory?

The results are saved in a rdbms.

If this job somehow reads all 500 GB before it sends it to the first operator, 
where is the data store?

Now considering that the EMR node only has 50GB of ebs (that's disk storage), 
is there a means to configure Flink to store its intermediate results within S3?

When the job failed, I saw this exception in the log: "Recovery is suppressed 
by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?

My job keeps on failing for the same reason, it says, "The heartbeat of 
TaskManager with id container_xxx timed out." Is there a way to configure it 
not to timeout?

I would appreciate any advice on how I should save these problems. Thank you.




[no subject]

2021-05-19 Thread Wenyi Xu



Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Thanks Dian. It worked for me

Regards,
Zerah

On Wed, May 19, 2021, 5:14 PM Dian Fu  wrote:

> Hi Zerah,
>
> You could try to replace
> ```
> value_schema = avro.schema.parse()
> ```
>
> with the following code:
> ```
> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
> value_schema = JSchemaParser().parse(value_schema_str)
> ```
>
> The reason is that ```value_schema = avro.schema.parse( here>) ``` will create a Python object instead of Java object.
>
> Regards,
> Dian
>
> 2021年5月19日 下午5:23,Zerah J  写道:
>
> Hi Dian,
>
> Type of value_schema is <*class 'avro.schema.RecordSchema*'>
>
> I have only a Json schema string and schema registry url. Please find
> below snippet :
>
> import avro.schema
>
> value_schema_str = """
> {
>   "namespace": "com.nextgen.customer",
>   "type": "record",
>   "name": "employee",
>   "doc": "Customer Details Value Schema.",
>   "fields": [
> {
>   "doc": "String Value",
>   "name": "emp_name",
>   "type": "string"
> },
> {
>   "doc": "String Value",
>   "name": "emp_id",
>   "type": "string"
> }
>   ]
> }
> value_schema = avro.schema.parse(value_schema_str)
> schema_url = "http://host:port;
>
>
> How can I create Java Schema object from this schema string and pass it
> from python method ?
>
>
> Regards,
> Zerah
>
>
> On Wed, May 19, 2021 at 1:57 PM Dian Fu  wrote:
>
>> Hi Zerah,
>>
>> What’s the type of value_schema? It should be a Java object of type
>> Schema. From the exception, it seems that it’s a class instead of object.
>> Is this true?
>>
>> Regards,
>> Dian
>>
>> 2021年5月19日 下午3:41,Zerah J  写道:
>>
>> Hi Dian,
>>
>> Thanks for your suggestion.
>>
>> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric
>> method from Python. But it's not working. Kindly check the code snippet
>> below :
>>
>> class MyAvroRowDeserializationSchema(DeserializationSchema):
>>
>> def __init__(self, record_class: str = None, avro_schema_string:
>> schema = None, url: str = None):
>> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>>
>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>> j_deserialization_schema =
>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>
>> super(MyAvroRowDeserializationSchema,
>> self).__init__(j_deserialization_schema)
>>
>>
>> FlinkKafkaConsumer is now invoked as below using
>> MyAvroRowDeserializationSchema :
>>
>> value_schema = avro.schema.parse()
>> schema_url = "http://host:port;
>> deserialization_schema =
>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>> kafka_source = FlinkKafkaConsumer(
>> topics='my_topic',
>> deserialization_schema=deserialization_schema,
>> properties={'bootstrap.servers': 'host:port', 'group.id':
>> 'test_group'})
>>
>> I'm getting the below error :
>>
>> Traceback (most recent call last):
>>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>> deserialization_schema =
>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>> j_deserialization_schema =
>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>   File
>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>> 1277, in __call__
>> args_command, temp_args = self._build_args(*args)
>>   File
>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>> 1247, in _build_args
>> [get_command_part(arg, self.pool) for arg in new_args])
>>   File
>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>> 1247, in 
>> [get_command_part(arg, self.pool) for arg in new_args])
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py",
>> line 298, in get_command_part
>> command_part = REFERENCE_TYPE + parameter._get_object_id()
>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>>
>>
>>
>> Please suggest how this method should be called. Here the schema used is
>> avro schema.
>>
>> Regards,
>> Zerah
>>
>> On Mon, May 17, 2021 at 3:17 PM Dian Fu  wrote:
>>
>>> Hi Zerah,
>>>
>>> I guess you could provide a Python implementation for
>>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper
>>> for the Java implementation and so it’s will be very easy to implement. You
>>> could take a look at AvroRowDeserializationSchema [1] as an example.
>>>
>>> Regards,
>>> Dian
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>>
>>> > 2021年5月17日 下午5:35,Zerah J  写道:
>>> >
>>> > Hi,
>>> >
>>> > I have below use case
>>> >
>>> > 1. Read streaming data from Kafka topic using Flink Python API
>>> > 2. Apply transformations on the data stream
>>> > 

[Statefun] Truncated Messages in Python workers

2021-05-19 Thread Jan Brusch

Hi,

recently we started seeing the following faulty behaviour in the Flink 
Stateful Functions HTTP communication towards external Python workers. 
This is only occuring when the system is under heavy load.


The Java Application will send HTTP Messages to an external Python 
Function but the external Function fails to parse the message with a 
"Truncated Message Error". Printouts show that the truncated message 
looks as follows:


--



my.protobuf.MyClass: 

my.protobuf.MyClass: 

my.protobuf.MyClass: 

my.protobuf.MyClass: Either the sender or the receiver (or something in between) seems to be 
truncacting some (not all) messages at some random point in the payload. 
The source code in both Flink SDKs looks to be correct. We temporarily 
solved this by setting the "maxNumBatchRequests" parameter in the 
external function definition really low. But this is not an ideal 
solution as we believe this adds considerable communication overhead 
between the Java and the Python Functions.


The Stateful Function version is 2.2.2, java8. The Java App as well as 
the external Python workers are deployed in the same kubernetes cluster.



Has anyone ever seen this problem before?

Best regards

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501



Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah,

You could try to replace
```
value_schema = avro.schema.parse() 
```

with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```

The reason is that ```value_schema = avro.schema.parse() ``` will create a Python object instead of Java object.

Regards,
Dian

> 2021年5月19日 下午5:23,Zerah J  写道:
> 
> Hi Dian,
> 
> Type of value_schema is 
> 
> I have only a Json schema string and schema registry url. Please find below 
> snippet :
> 
> import avro.schema
> 
> value_schema_str = """
> {   
>   "namespace": "com.nextgen.customer",
>   "type": "record",
>   "name": "employee",   
>   "doc": "Customer Details Value Schema.",
>   "fields": [
> {
>   "doc": "String Value",
>   "name": "emp_name",
>   "type": "string"
> },
> {
>   "doc": "String Value",
>   "name": "emp_id",
>   "type": "string"
> }
>   ]
> }
> value_schema = avro.schema.parse(value_schema_str) 
> schema_url = "http://host:port;
> 
> 
> How can I create Java Schema object from this schema string and pass it from 
> python method ?
> 
> 
> Regards,
> Zerah
> 
> 
> On Wed, May 19, 2021 at 1:57 PM Dian Fu  > wrote:
> Hi Zerah,
> 
> What’s the type of value_schema? It should be a Java object of type Schema. 
> From the exception, it seems that it’s a class instead of object. Is this 
> true?
> 
> Regards,
> Dian
> 
>> 2021年5月19日 下午3:41,Zerah J > > 写道:
>> 
>> Hi Dian,
>> 
>> Thanks for your suggestion.
>> 
>> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric 
>> method from Python. But it's not working. Kindly check the code snippet 
>> below :
>> 
>> class MyAvroRowDeserializationSchema(DeserializationSchema):
>> 
>> def __init__(self, record_class: str = None, avro_schema_string: schema 
>> = None, url: str = None):
>> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>> 
>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>> j_deserialization_schema = 
>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>> 
>> super(MyAvroRowDeserializationSchema, 
>> self).__init__(j_deserialization_schema)
>> 
>> 
>> FlinkKafkaConsumer is now invoked as below using 
>> MyAvroRowDeserializationSchema :
>> 
>> value_schema = avro.schema.parse() 
>> schema_url = "http://host:port "
>> deserialization_schema = 
>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>> kafka_source = FlinkKafkaConsumer(
>> topics='my_topic',
>> deserialization_schema=deserialization_schema,
>> properties={'bootstrap.servers': 'host:port', 'group.id 
>> ': 'test_group'})
>> 
>> I'm getting the below error :
>> 
>> Traceback (most recent call last):
>>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>> deserialization_schema = 
>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>> j_deserialization_schema = 
>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>> line 1277, in __call__
>> args_command, temp_args = self._build_args(*args)
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>> line 1247, in _build_args
>> [get_command_part(arg, self.pool) for arg in new_args])
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>> line 1247, in 
>> [get_command_part(arg, self.pool) for arg in new_args])
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 
>> 298, in get_command_part
>> command_part = REFERENCE_TYPE + parameter._get_object_id()
>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>> 
>> 
>> 
>> Please suggest how this method should be called. Here the schema used is 
>> avro schema.
>> 
>> Regards,
>> Zerah
>> 
>> On Mon, May 17, 2021 at 3:17 PM Dian Fu > > wrote:
>> Hi Zerah,
>> 
>> I guess you could provide a Python implementation for 
>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper 
>> for the Java implementation and so it’s will be very easy to implement. You 
>> could take a look at AvroRowDeserializationSchema [1] as an example.
>> 
>> Regards,
>> Dian
>> 
>> [1] 
>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>  
>> 
>> 
>> > 2021年5月17日 下午5:35,Zerah J > > > 写道:
>> > 
>> > 

??????????????queryable-state.proxy.ports????????

2021-05-19 Thread cao.j
??taskManager??queryable-state.proxy.ports??0??proxy

Re:

2021-05-19 Thread Jake
Hi, vtygoss


You can check out the official demo[1]

```
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build()

val tEnv = TableEnvironment.create(setting)
```

Regards


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/common/
  


> On May 19, 2021, at 18:01, vtygoss  wrote:
> 
> 
> Hi,
> 
> I have below use case
> 
> Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on 
> yarn, but  yarn application is still running when insert job finished, and 
> yarn container is not released.
> 
> I try to use BatchTableEnvironment, but “Primary key and unique key are not 
> supported yet”; i try to use 
> StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but 
> it not works.
> 
> Please help to offer some advice. 
> 
> Regards
> 
> 
> ```
> [test case code]
> val (senv, btenv) = FlinkSession.getOrCreate()
> val table = btenv.fromValues(
>   Row.ofKind(RowKind.INSERT, "1"),
>   Row.ofKind(RowKind.INSERT, "2")).select("f0")
> 
> btenv.createTemporaryView("bound", table)
> btenv.executeSql(s"create table if not exists test_result(" +
>   "id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
>   
> s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}',"
>  +
>   s"'key.format'='json','value.format'='json')")
> btenv.executeSql("insert into test_result select f0 from bound")
> 
> ```
> 
>  
> 
> 
> 



[no subject]

2021-05-19 Thread vtygoss
Hi,


I have below use case


Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, 
but  yarn application is still running when insert job finished, and yarn 
container is not released.


I try to use BatchTableEnvironment, but “Primary key and unique key are not 
supported yet”; i try to use 
StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but 
it not works.


Please help to offer some advice. 


Regards




```
[test case code]
val (senv, btenv) = FlinkSession.getOrCreate()
val table = btenv.fromValues(
 Row.ofKind(RowKind.INSERT, "1"),
 Row.ofKind(RowKind.INSERT, "2")).select("f0")

btenv.createTemporaryView("bound", table)
btenv.executeSql(s"create table if not exists test_result(" +
 "id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
 
s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}',"
 +
 s"'key.format'='json','value.format'='json')")
btenv.executeSql("insert into test_result select f0 from bound")


```

Re:Re:Re:flink sql写mysql中文乱码问题

2021-05-19 Thread Michael Ran



数据库的字段字符编码














在 2021-05-18 18:19:31,"casel.chen"  写道:
>我的URL连接串已经使用了  useUnicode=truecharacterEncoding=UTF-8 结果还是会有乱码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:
>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE
>> TABLE jdbc_sink(id INT  COMMENT '订单id',goods_name 
>>VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格', 
>>   user_name VARCHAR(64) COMMENT '用户名称') WITH (   'connector' = 
>>'jdbc',   'url' = 
>>'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8',
>>   'username' = 'mysqluser',   'password' = 'mysqluser',   
>>'table-name' = 'jdbc_sink')
>>在 2021-05-18 11:55:46,"casel.chen"  写道:
>>>我的flink sql作业如下
>>>
>>>
>>>SELECT
>>>product_name,
>>>window_start,
>>>window_end,
>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>-- LOCALTIMESTAMP AS insert_time,
>>>'微支付事业部'AS bus_name
>>>FROM(
>>>
>>>
>>>mysql sink表的定义如下
>>>CREATE TABLE XXX (
>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>
>>>
>>>运行起来后写入mysql表的数据带有中文乱码 ??
>>>
>>>
>>>
>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>2021-05-17 18:02:25,010 INFO 
>>>org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task 
>>>GroupAggregate(groupBy=[product_name, window_start, window_end], 
>>>select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS 
>>>$f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS 
>>>product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd 
>>>HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT 
>>>_UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, 
>>>CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
>>>_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
>>>AS bus_name]) -> Sink: 
>>>Sink(table=[default_catalog.default_database.all_trans_5m_new], 
>>>fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
>>>insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy 
>>>into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>>GroupAggregate(groupBy=[product_name, window_start, window_end, id, 
>>>data_type, mer_cust_id, order_no, trans_date], select=[product_name, 
>>>window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, 
>>>MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
>>>window_start, window_end, trans_amt, order_no]) (1/1)#0 
>>>(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.


Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Thanks Xingbo! The workaround will probably work for now, at least it
avoids having to refer to index values in the rest of the function.

Cheers,
Sumeet


On Wed, May 19, 2021 at 3:02 PM Xingbo Huang  wrote:

> Hi Sumeet,
>
> Due to the limitation of the original PyFlink serializers design, there is
> no way to pass attribute names to Row in row-based operations. In
> release-1.14, I am reconstructing the implementations of serializers[1].
> After completion, accessing attribute names of `Row` in row-based
> operations will be supported[2].
>
> About the work around way in releases-1.13, maybe you need to manually set
> the field_names of Row. e.g.
> ```
> def my_table_tranform_fn(x: Row):
> x.set_field_names(['a', 'b', 'c'])
> ...
> ```
>
> [1] https://issues.apache.org/jira/browse/FLINK-22612
> [2] https://issues.apache.org/jira/browse/FLINK-22712
>
> Best,
> Xingbo
>
> Sumeet Malhotra  于2021年5月19日周三 下午4:45写道:
>
>> Hi,
>>
>> According to the documentation for PyFlink Table row based operations
>> [1], typical usage is as follows:
>>
>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
>> def split(x: Row) -> Row:
>> for s in x[1].split(","):
>> yield x[0], s
>>
>> table.flat_map(split)
>>
>> Is there any way that row fields inside the UDTF can be accessed by
>> their attribute names instead of array index? In my use case, I'm doing the
>> following:
>>
>> raw_data = t_env.from_path('MySource')
>> raw_data \
>> .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
>> .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
>> .execute_insert("MySink")
>>
>> In the table function `my_flat_map_fn` I'm unable to access the fields
>> of the row by their attribute names i.e., assuming the input argument to
>> the table function is x, I cannot access fields as x.a, x.b or x.c, instead
>> I have use use x[0], x[1] and x[2]. The error I get is the _fields is not
>> populated.
>>
>> In my use case, the number of columns is very high and working with
>> indexes is so much error prone and unmaintainable.
>>
>> Any suggestions?
>>
>> Thanks,
>> Sumeet
>>
>>


Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Xingbo Huang
Hi Sumeet,

Due to the limitation of the original PyFlink serializers design, there is
no way to pass attribute names to Row in row-based operations. In
release-1.14, I am reconstructing the implementations of serializers[1].
After completion, accessing attribute names of `Row` in row-based
operations will be supported[2].

About the work around way in releases-1.13, maybe you need to manually set
the field_names of Row. e.g.
```
def my_table_tranform_fn(x: Row):
x.set_field_names(['a', 'b', 'c'])
...
```

[1] https://issues.apache.org/jira/browse/FLINK-22612
[2] https://issues.apache.org/jira/browse/FLINK-22712

Best,
Xingbo

Sumeet Malhotra  于2021年5月19日周三 下午4:45写道:

> Hi,
>
> According to the documentation for PyFlink Table row based operations [1],
> typical usage is as follows:
>
> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
> def split(x: Row) -> Row:
> for s in x[1].split(","):
> yield x[0], s
>
> table.flat_map(split)
>
> Is there any way that row fields inside the UDTF can be accessed by
> their attribute names instead of array index? In my use case, I'm doing the
> following:
>
> raw_data = t_env.from_path('MySource')
> raw_data \
> .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
> .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
> .execute_insert("MySink")
>
> In the table function `my_flat_map_fn` I'm unable to access the fields of
> the row by their attribute names i.e., assuming the input argument to the
> table function is x, I cannot access fields as x.a, x.b or x.c, instead I
> have use use x[0], x[1] and x[2]. The error I get is the _fields is not
> populated.
>
> In my use case, the number of columns is very high and working with
> indexes is so much error prone and unmaintainable.
>
> Any suggestions?
>
> Thanks,
> Sumeet
>
>


Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian,

Type of value_schema is <*class 'avro.schema.RecordSchema*'>

I have only a Json schema string and schema registry url. Please find below
snippet :

import avro.schema

value_schema_str = """
{
  "namespace": "com.nextgen.customer",
  "type": "record",
  "name": "employee",
  "doc": "Customer Details Value Schema.",
  "fields": [
{
  "doc": "String Value",
  "name": "emp_name",
  "type": "string"
},
{
  "doc": "String Value",
  "name": "emp_id",
  "type": "string"
}
  ]
}
value_schema = avro.schema.parse(value_schema_str)
schema_url = "http://host:port;


How can I create Java Schema object from this schema string and pass it
from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu  wrote:

> Hi Zerah,
>
> What’s the type of value_schema? It should be a Java object of type
> Schema. From the exception, it seems that it’s a class instead of object.
> Is this true?
>
> Regards,
> Dian
>
> 2021年5月19日 下午3:41,Zerah J  写道:
>
> Hi Dian,
>
> Thanks for your suggestion.
>
> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric
> method from Python. But it's not working. Kindly check the code snippet
> below :
>
> class MyAvroRowDeserializationSchema(DeserializationSchema):
>
> def __init__(self, record_class: str = None, avro_schema_string:
> schema = None, url: str = None):
> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>
> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
> j_deserialization_schema =
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>
> super(MyAvroRowDeserializationSchema,
> self).__init__(j_deserialization_schema)
>
>
> FlinkKafkaConsumer is now invoked as below using
> MyAvroRowDeserializationSchema :
>
> value_schema = avro.schema.parse()
> schema_url = "http://host:port;
> deserialization_schema =
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
> kafka_source = FlinkKafkaConsumer(
> topics='my_topic',
> deserialization_schema=deserialization_schema,
> properties={'bootstrap.servers': 'host:port', 'group.id':
> 'test_group'})
>
> I'm getting the below error :
>
> Traceback (most recent call last):
>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
> deserialization_schema =
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
> j_deserialization_schema =
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
> line 1277, in __call__
> args_command, temp_args = self._build_args(*args)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
> line 1247, in _build_args
> [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
> line 1247, in 
> [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py",
> line 298, in get_command_part
> command_part = REFERENCE_TYPE + parameter._get_object_id()
> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>
>
>
> Please suggest how this method should be called. Here the schema used is
> avro schema.
>
> Regards,
> Zerah
>
> On Mon, May 17, 2021 at 3:17 PM Dian Fu  wrote:
>
>> Hi Zerah,
>>
>> I guess you could provide a Python implementation for
>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper
>> for the Java implementation and so it’s will be very easy to implement. You
>> could take a look at AvroRowDeserializationSchema [1] as an example.
>>
>> Regards,
>> Dian
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>
>> > 2021年5月17日 下午5:35,Zerah J  写道:
>> >
>> > Hi,
>> >
>> > I have below use case
>> >
>> > 1. Read streaming data from Kafka topic using Flink Python API
>> > 2. Apply transformations on the data stream
>> > 3. Write back to different kafka topics based on the incoming data
>> >
>> > Input data is coming from Confluent Avro Producer. By using the
>> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm
>> unable to deserialize the data.
>> >
>> > Please help to process the data as
>> ConfluentRegistryAvroDeserializationSchema is not available in the Python
>> API.
>> >
>> > Regards,
>> > Zerah
>>
>>
>


Re: 请问flink 什么时候支持读写ACID的hive表

2021-05-19 Thread Rui Li
你好,

Flink暂时没有计划支持hive的ACID表。目前hive connector的代码无法保证ACID语义,所以即使你去掉“Reading or
writing ACID table %s is not supported”这个检查也达不到预期的效果。
是否考虑将ACID表迁移到数据湖中呢,比如iceberg有相应的迁移工具[1]。

[1] https://iceberg.apache.org/spark-procedures/#table-migration

On Wed, May 19, 2021 at 1:16 PM youngysh  wrote:

> hi
>
>
>  我们使用 flink.1.12 读取 ACID hive table 时报错(Reading or writing ACID table
> %s is not supported),我们尝试修改源码放开这个限制也会出现后续的一些错误如(cast转换 BytesColumnVector 为
> LongColumnVector 出错)。
>  背景:目前我们生产想采用 flink 做 ETL 等数据迁移工作,对应的hive都是hive 3.0左右的版本或者hive
> 2.3.6的版本,默认都是ACID的表,而且数据量很大,现在使用flink做数据迁移,如果flink只支持读取非ACID标的话,我们需要全部重建hive的表是很费力的。
> 请问一下flink什么版本有计划支持读取 ACID的hive table?或者,目前有无办法解决我这样的问题?



-- 
Best regards!
Rui Li


Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Hi,

According to the documentation for PyFlink Table row based operations [1],
typical usage is as follows:

@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
for s in x[1].split(","):
yield x[0], s

table.flat_map(split)

Is there any way that row fields inside the UDTF can be accessed by
their attribute names instead of array index? In my use case, I'm doing the
following:

raw_data = t_env.from_path('MySource')
raw_data \
.join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
.flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
.execute_insert("MySink")

In the table function `my_flat_map_fn` I'm unable to access the fields of
the row by their attribute names i.e., assuming the input argument to the
table function is x, I cannot access fields as x.a, x.b or x.c, instead I
have use use x[0], x[1] and x[2]. The error I get is the _fields is not
populated.

In my use case, the number of columns is very high and working with indexes
is so much error prone and unmaintainable.

Any suggestions?

Thanks,
Sumeet


Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. 
From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

> 2021年5月19日 下午3:41,Zerah J  写道:
> 
> Hi Dian,
> 
> Thanks for your suggestion.
> 
> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric 
> method from Python. But it's not working. Kindly check the code snippet below 
> :
> 
> class MyAvroRowDeserializationSchema(DeserializationSchema):
> 
> def __init__(self, record_class: str = None, avro_schema_string: schema = 
> None, url: str = None):
> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
> 
> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
> j_deserialization_schema = 
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
> 
> super(MyAvroRowDeserializationSchema, 
> self).__init__(j_deserialization_schema)
> 
> 
> FlinkKafkaConsumer is now invoked as below using 
> MyAvroRowDeserializationSchema :
> 
> value_schema = avro.schema.parse() 
> schema_url = "http://host:port;
> deserialization_schema = 
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
> kafka_source = FlinkKafkaConsumer(
> topics='my_topic',
> deserialization_schema=deserialization_schema,
> properties={'bootstrap.servers': 'host:port', 'group.id 
> ': 'test_group'})
> 
> I'm getting the below error :
> 
> Traceback (most recent call last):
>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
> deserialization_schema = 
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
> j_deserialization_schema = 
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
> line 1277, in __call__
> args_command, temp_args = self._build_args(*args)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
> line 1247, in _build_args
> [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
> line 1247, in 
> [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 
> 298, in get_command_part
> command_part = REFERENCE_TYPE + parameter._get_object_id()
> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
> 
> 
> 
> Please suggest how this method should be called. Here the schema used is avro 
> schema.
> 
> Regards,
> Zerah
> 
> On Mon, May 17, 2021 at 3:17 PM Dian Fu  > wrote:
> Hi Zerah,
> 
> I guess you could provide a Python implementation for 
> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for 
> the Java implementation and so it’s will be very easy to implement. You could 
> take a look at AvroRowDeserializationSchema [1] as an example.
> 
> Regards,
> Dian
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>  
> 
> 
> > 2021年5月17日 下午5:35,Zerah J  > > 写道:
> > 
> > Hi,
> > 
> > I have below use case 
> > 
> > 1. Read streaming data from Kafka topic using Flink Python API
> > 2. Apply transformations on the data stream
> > 3. Write back to different kafka topics based on the incoming data
> > 
> > Input data is coming from Confluent Avro Producer. By using the existing 
> > pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to 
> > deserialize the data. 
> > 
> > Please help to process the data as 
> > ConfluentRegistryAvroDeserializationSchema is not available in the Python 
> > API.
> > 
> > Regards,
> > Zerah
> 



Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-19 Thread Matthias Pohl
Hi Gary,
Not sure whether you've seen my question in the Jira issue: May you be able
to share the overall JobManager/TaskManager logs with us? That would help
us understand the context a bit more on why no TaskManagerLocation was set.
Let's move any further correspondence into FLINK-22688 [1]

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22688

On Wed, May 19, 2021 at 5:45 AM Gary Wu  wrote:

> Thanks! I have updated the detail and task manager log in
> https://issues.apache.org/jira/browse/FLINK-22688.
>
> Regards,
> -Gary
>
> On Tue, 18 May 2021 at 16:22, Matthias Pohl 
> wrote:
>
>> Sorry, for not getting back earlier. I missed that thread. It looks like
>> some wrong assumption on our end. Hence, Yangze and Guowei are right. I'm
>> gonna look into the issue.
>>
>> Matthias
>>
>> On Fri, May 14, 2021 at 4:21 AM Guowei Ma  wrote:
>>
>>> Hi, Gary
>>>
>>> I think it might be a bug. So would you like to open a jira for this.
>>> And could you share the exception ,which the TaskManagerLocation is
>>> null? It might be very helpful to verify the cause.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Thu, May 13, 2021 at 10:36 AM Yangze Guo  wrote:
>>>
 Hi, it seems to be related to FLINK-22276. Thus, I'd involve Matthias
 to take a look.

 @Matthias My gut feeling is that not all execution who has failureInfo
 has been deployed?

 Best,
 Yangze Guo

 On Wed, May 12, 2021 at 10:12 PM Gary Wu  wrote:
 >
 > Hi,
 >
 > We have upgraded our Flink applications to 1.13.0 but we found that
 Root Exception can not be shown on Web UI with an internal server error
 message. After opening browser development console and trace the message,
 we found that there is a exception in jobmanager:
 >
 > 2021-05-12 13:30:45,589 ERROR
 org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
 Unhandled exception.
 > java.lang.IllegalArgumentException: The location must not be null for
 a non-global failure.
 > at
 org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
 ~[?:?]
 > at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]
 > at
 java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632)
 ~[?:?]
 > at
 java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
 ~[?:?]
 > at
 java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
 ~[?:?]
 > at
 java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) 
 ~[?:?]
 > at
 java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
 ~[?:?]
 > at
 java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
 ~[?:?]
 > at
 java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
 ~[?:?]
 > at
 java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
 ~[?:?]
 > at
 org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
 ~[flink-dist_2.12-1.13.0.jar:1.13.0]
 > at
 java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
 [?:?]
 > at
 java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 [?:?]
 > at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 [?:?]
 > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
 > at
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
 >

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric
method from Python. But it's not working. Kindly check the code snippet
below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

def __init__(self, record_class: str = None, avro_schema_string: schema
= None, url: str = None):
JConfluentAvroRowDeserializationSchema = get_gateway().jvm \

.org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
j_deserialization_schema =
JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

super(MyAvroRowDeserializationSchema,
self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using
MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse()
schema_url = "http://host:port;
deserialization_schema =
MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
topics='my_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'host:port', 'group.id':
'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
deserialization_schema =
MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
j_deserialization_schema =
JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
line 1277, in __call__
args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
line 1247, in _build_args
[get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
line 1247, in 
[get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line
298, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'



Please suggest how this method should be called. Here the schema used is
avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu  wrote:

> Hi Zerah,
>
> I guess you could provide a Python implementation for
> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper
> for the Java implementation and so it’s will be very easy to implement. You
> could take a look at AvroRowDeserializationSchema [1] as an example.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>
> > 2021年5月17日 下午5:35,Zerah J  写道:
> >
> > Hi,
> >
> > I have below use case
> >
> > 1. Read streaming data from Kafka topic using Flink Python API
> > 2. Apply transformations on the data stream
> > 3. Write back to different kafka topics based on the incoming data
> >
> > Input data is coming from Confluent Avro Producer. By using the existing
> pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to
> deserialize the data.
> >
> > Please help to process the data as
> ConfluentRegistryAvroDeserializationSchema is not available in the Python
> API.
> >
> > Regards,
> > Zerah
>
>


Re: DataStream Batch Execution Mode and large files.

2021-05-19 Thread Marco Villalobos
Thank you very much. You've been very helpful.

Since my intermediate results are large, I suspect that io.tmp.dirs must
literally be on the local file system. Thus, since I use EMR, I'll need to
configure EBS to support more data.

On Tue, May 18, 2021 at 11:08 PM Yun Gao  wrote:

> Hi Marco,
>
> With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking
> and would use intermediate file to transfer data. Flink now support hash
> shuffle
> and sort shuffle for blocking edges[1], both of them stores the
> intermediate files in
> the directories configured by io.tmp.dirs[2].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/batch/blocking_shuffle/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#io-tmp-dirs
>
> --Original Mail --
> *Sender:*Marco Villalobos 
> *Send Date:*Wed May 19 09:50:45 2021
> *Recipients:*user 
> *Subject:*DataStream Batch Execution Mode and large files.
>
>> Hi,
>>
>> I am using the DataStream API in Batch Execution Mode, and my "source" is
>> an s3 Buckets with about 500 GB of data spread across many files.
>>
>> Where does Flink stored the results of processed / produced data between
>> tasks?
>>
>> There is no way that 500GB will fit in memory.  So I am very curious how
>> that happens.
>>
>> Can somebody please explain?
>>
>> Thank you.
>>
>> Marco A. Villalobos
>>
>


flink all events getting dropped as late

2021-05-19 Thread Debraj Manna
Crossposting from stackoverflow


My flink pipeline looks like below

WatermarkStrategy watermarkStrategy = WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(900))
.withTimestampAssigner((metric, timestamp) -> {
logger.info("ETS: mts: {}, ts: {}",
metric.metricPoint.timeInstant, timestamp);
return metric.metricPoint.timeInstant;
});

metricStream = kafkasource
.process(<>)
.assignTimestampsAndWatermarks(watermarkStrategy)
.transform("debugFilter",
TypeInformation.of(<>), new StreamWatermarkDebugFilter<>("Op"))
.filter(<>)
.map(<>)
.flatMap(<>)
.keyBy(<>)
.window(TumblingEventTimeWindows.of(Time.seconds(300)))
.allowedLateneess(900)
.sideOutputLateData(lateOutputTag)
.aggregate(AggregateFunction, ProcessWindowFunction)
.addSink()

I am running with parallelism 1 and default setAutowatermarkInterval of 200
ms. I did not set setStreamTimeCharacteristic as from flink 1.12 by default
it is event time.

I am seeing that watermark is progressing from the output of
StreamWatermarkDebugFilter

but
all the events are getting marked as late and is getting gathered at
lateOutputTag.

2021-05-18 17:14:19,745 INFO  - ETS: mts:
162131010, ts: 1621310582271
2021-05-18 17:14:19,745 INFO  - ETS: mts:
162131010, ts: 1621310582271
2021-05-18 17:14:19,842 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162130949
2021-05-18 17:14:19,944 INFO  - ETS: mts:
162130980, ts: 1621310582275
2021-05-18 17:14:19,944 INFO  - ETS: mts:
162130980, ts: 1621310582275
...
2021-05-18 17:14:20,107 INFO  - ETS: mts:
162131038, ts: 1621310582278
2021-05-18 17:14:20,107 INFO  - ETS: mts:
162131038, ts: 1621310582278
2021-05-18 17:14:20,137 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162130977
2021-05-18 17:14:20,203 INFO  - ETS: mts:
162130980, ts: 1621310582279
...
2021-05-18 17:17:47,839 INFO  - ETS: mts:
162131010, ts: 1621310681159
2021-05-18 17:17:47,848 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162131009
2021-05-18 17:17:47,958 INFO  - ETS: mts:
162130980, ts: 1621310681237
2021-05-18 17:17:47,958 INFO  - ETS: mts:
162130980, ts: 1621310681237
...
2021-05-18 17:22:24,207 INFO  - ETS: mts:
162131010, ts: 1621310703622
2021-05-18 17:22:24,229 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162131039
2021-05-18 17:22:24,315 INFO  - ETS: mts:
162130980, ts: 1621310705177
2021-05-18 17:22:24,315 INFO  - ETS: mts:
162130980, ts: 1621310705177

I have seen this discussion

and
it is not an idleness problem.

It looks like related to this discussion
.
Can someone suggest how can I debug this problem further?


Re: DataStream Batch Execution Mode and large files.

2021-05-19 Thread Yun Gao
Hi Marco,

With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking
and would use intermediate file to transfer data. Flink now support hash 
shuffle 
and sort shuffle for blocking edges[1], both of them stores the intermediate 
files in
the directories configured by io.tmp.dirs[2].


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/batch/blocking_shuffle/
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#io-tmp-dirs
 --Original Mail --
Sender:Marco Villalobos 
Send Date:Wed May 19 09:50:45 2021
Recipients:user 
Subject:DataStream Batch Execution Mode and large files.

Hi,

I am using the DataStream API in Batch Execution Mode, and my "source" is an s3 
Buckets with about 500 GB of data spread across many files.

Where does Flink stored the results of processed / produced data between tasks?

There is no way that 500GB will fit in memory.  So I am very curious how that 
happens.

Can somebody please explain?

Thank you.

Marco A. Villalobos

Re: DataStream API Batch Execution Mode restarting...

2021-05-19 Thread Marco Villalobos
Thank you.  I used the default restart strategy.  I'll change that.

On Tue, May 18, 2021 at 11:02 PM Yun Gao  wrote:

> Hi Marco,
>
> Have you configured the restart strategy ? if the restart-strategy [1] is
> configuration
> into some strategies other than none, Flink should be able to restart the
> job automatically
> on failover. The restart strategy could also be configuration via
> StreamExecutionEnvironment#setRestartStrategy.
>
> If no restart strategy is configured (the default behavior), the job would
> failed and we would
> need to re-submit the job to execute it from the scratch.
>
> Best,
> Yun
>
>
>
> --Original Mail --
> *Sender:*Marco Villalobos 
> *Send Date:*Wed May 19 11:27:37 2021
> *Recipients:*user 
> *Subject:*DataStream API Batch Execution Mode restarting...
>
>> I have a DataStream running in Batch Execution mode within YARN on EMR.
>> My job failed an hour into the job two times in a row because the task
>> manager heartbeat timed out.
>>
>> Can somebody point me out how to restart a job in this situation? I can't
>> find that section of the documentation.
>>
>> thank you.
>>
>


Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Marco Villalobos
Questions Flink DataStream in BATCH execution mode scalability advice.

Here is the problem that I am trying to solve.

Input is an S3 bucket directory with about 500 GB of data across many
files. The instance that I am running on only has 50GB of EBS storage. The
nature of this data is time series data. Imagine name, value, timestamp.

I must average the time_series.value by time_series.name on a tumbling
window of 15 minutes. Upon aggregation, the time_series.timestamp gets
rounded up a quarter.  I key by tag name and 15-minute interval.

After aggregation, I must forward fill the missing quarters for each
time_series.name. Currently, this forward fill operator is keyed only by
time_series.name. Does this mean that in batch mode, all of the time series
with the same time_series.name within the 500 gb of files must fit in
memory?

The results are saved in a rdbms.

If this job somehow reads all 500 GB before it sends it to the first
operator, where is the data store?

Now considering that the EMR node only has 50GB of ebs (that's disk
storage), is there a means to configure Flink to store its intermediate
results within S3?

When the job failed, I saw this exception in the log: "Recovery is
suppressed by NoRestartBackoffTimeStrategy." Is there a way to configure
this to recover?

My job keeps on failing for the same reason, it says, "The heartbeat of
TaskManager with id container_xxx timed out." Is there a way to configure
it not to timeout?

I would appreciate any advice on how I should save these problems. Thank
you.


Re: DataStream API Batch Execution Mode restarting...

2021-05-19 Thread Yun Gao
Hi Marco,

Have you configured the restart strategy ? if the restart-strategy [1] is 
configuration
into some strategies other than none, Flink should be able to restart the job 
automatically
on failover. The restart strategy could also be configuration via 
StreamExecutionEnvironment#setRestartStrategy. 

If no restart strategy is configured (the default behavior), the job would 
failed and we would
need to re-submit the job to execute it from the scratch.

Best,
Yun




 --Original Mail --
Sender:Marco Villalobos 
Send Date:Wed May 19 11:27:37 2021
Recipients:user 
Subject:DataStream API Batch Execution Mode restarting...

I have a DataStream running in Batch Execution mode within YARN on EMR.
My job failed an hour into the job two times in a row because the task manager 
heartbeat timed out.

Can somebody point me out how to restart a job in this situation? I can't find 
that section of the documentation.

thank you.

flink 1.13.0 ??????flink sql ??????????????????????????????????schema.name

2021-05-19 Thread Asahi Lee
hi!
   flink jdbc ?? 
table-name??
CREATE TABLE MyUserTable (   id BIGINT,   name STRING,   age INT,   status 
BOOLEAN,   PRIMARY KEY (id) NOT ENFORCED ) WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'org.users' 
);