Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-27 Thread Xingbo Huang
Hi Pierre,

Sorry for the late reply.
Your requirement is that your `Table` has a `field` in `Json` format and its 
key has reached 100k, and then you want to use such a `field` as the 
input/output of `udf`, right? As to whether there is a limit on the number of 
nested key, I am not quite clear. Other contributors with experience in this 
area may have answers. On the part of `Python UDF`, if the type of key or value 
of your `Map` is `Any`, we do not support it now. You need to specify a 
specific type. For more information, please refer to the related document[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html

Best,
Xingbo

> 2020年11月28日 上午12:49,Pierre Oberholzer  写道:
> 
> Hello Wei, Dian, Xingbo,
> 
> Not really sure when it is appropriate to knock on the door of the community 
> ;)
> I just wanted to mention that your feedback on the above topic will be highly 
> appreciated as it will condition the choice of framework on our side for the 
> months to come, and potentially help the community to cover sparse data with 
> Flink.
> 
> Thanks a lot !
> 
> Have a great week-end
> 
> Best,
> 
> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer  > a écrit :
> Hi Wei,
> 
> Thanks for the hint. May I please follow up by adding more context and ask 
> for your guidance.
> 
> In case the bespoken Map[String,Any] object returned by Scala:
> 
> - Has a defined schema (incl. nested) with up to 100k (!) different possible 
> keys
> - Has only some portion of the keys populated for each record
> - Is convertible to JSON
> - Has to undergo downstream processing in Flink and/or Python UDF with key 
> value access
> - Has to be ultimately stored in a Kafka/AVRO sink
> 
> How would you declare the types explicitly in such a case ?
> 
> Thanks for your support !
> 
> Pierre
> 
> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  > a écrit :
> Hi Pierre,
> 
> Currently there is no type hint like ‘Map[String, Any]’. The recommended way 
> is declaring your type more explicitly.
> 
> If you insist on doing this, you can try to declaring a RAW data type for 
> java.util.HashMap [1], but you may encounter some troubles [2] related to the 
> kryo serializers.
> 
> Best,
> Wei
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>  
> 
> [2] 
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>  
> 
> 
> 
>> 在 2020年11月19日,04:31,Pierre Oberholzer > > 写道:
>> 
>> Hi Wei,
>> 
>> It works ! Thanks a lot for your support.
>> I hadn't tried this last combination for option 1, and I had wrong syntax 
>> for option 2.
>> 
>> So to summarize..
>> 
>> Methods working:
>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>> - Outdated: override getResultType in UDF definition + 
>> t_env.register_java_function for UDF registering
>> 
>> Type conversions working:
>> - scala.collection.immutable.Map[String,String] => 
>> org.apache.flink.types.Row => ROW
>> - scala.collection.immutable.Map[String,String] => 
>> java.util.Map[String,String] => MAP
>> 
>> Any hint for Map[String,Any] ?
>> 
>> Best regards,
>> 
>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong > > a écrit :
>> Hi Pierre,
>> 
>> Those 2 approaches all work in my local machine, this is my code:
>> 
>> Scala UDF:
>> package com.dummy
>> 
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.table.annotation.DataTypeHint
>> import org.apache.flink.table.api.Types
>> import org.apache.flink.table.functions.ScalarFunction
>> import org.apache.flink.types.Row
>> 
>> /**
>>   * The scala UDF.
>>   */
>> class dummyMap extends ScalarFunction {
>> 
>>   // If the udf would be registered by the SQL statement, you need add this 
>> typehint
>>   @DataTypeHint("ROW")
>>   def eval(): Row = {
>> 
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>> 
>>   }
>> 
>>   // If the udf would be registered by the method 'register_java_function', 
>> you need override this
>>   // method.
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = {
>> // The type of the return values should be TypeInformation
>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>> Types.STRING()))
>>   }
>> }
>> Python code:
>> 
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment
>> 
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> st_env = StreamTableEnvironment.create(s_env)
>> 
>> # load the scala udf jar file, the path should be 

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard,


> From  my understanding, your case is not a pure deduplication case but
> want to both keep the previous record and current record, thus the
> deduplication query can not satisfy your requirement.
>

Indeed, that's what I came to realise during our discussion on this email
chain. I'm sorry if it caused confusion. I'm still not sure how to express
this requirement in a concise way: "the need to deduplicate but let
previous values come back after a different value has appeared"


> Keeping last row in Deduplication always produces a changelog stream,
> because we need to retract the previous last value and sent the new last
> value. You could use a connector that supports upsert sink like HBase, JDBC
> or upsert-kafka connector when sink a changelog stream, the kafka connector
> can only accept append-only stream and thus you got the message.
>

That's what I understood indeed. But in my case I really do want to insert
and not upsert.
Just for information: the goal is to be able to historize kafka messages in
real-time. Each message could potentially be splitted to store information
in multiple tables (in my example: name and address would be inserted in 2
different tables), and the history should be kept and enriched with the
ingestion date. The fact that the kafka message can be split to be stored
in multiple tables creates that "deduplication" requirement (in my example
the address could have changed but not the name, and we don't want to add a
record with no business value in the table containing the names). And of
course, a field can be changed twice and as a result have the same value
again, and that's business information we do want to keep.


> The LAG function is used in over window aggregation and should work in
> your case, but unfortunately look like the LAG function does not implements
> correctly, I create an issue[1] to fix this.
>

Thanks a lot! I'll follow the issue.
I would love to try to fix it... but quickly looking at that code, I'm not
sure it's the best way to start contributing. I don't understand what
should be changed in that code, let alone find what generated that code and
how it should be fixed...


In the meantime, I guess the only other option would be the MATCH_RECOGNIZE?
Do you think you help me find what I did wrong in this query:

SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
B.client_number as client_number,
B.address as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
) as T;

I get the following error:
SQL validation failed. Index 0 out of bounds for length 0

Thanks a lot for your help!

Laurent.


>
>
> Best,
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-20405
>
> On Fri, 27 Nov 2020 at 03:28, Leonard Xu  wrote:
>
>> Hi, Laurent
>>
>> Basically, I need to deduplicate, *but only keeping in the deduplication
>> state the latest value of the changed column* to compare with. While
>> here it seems to keep all previous values…
>>
>>
>> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it
>> will keep last row, I think that’s what you want.
>>
>> BTW, the deduplication has supported event time in 1.12, this will be
>> available soon.
>>
>> Best,
>> Leonard
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
> Rue Emile Francqui, 4
> 1435 Mont-Saint-Guibert
> (T) +32 10 75 02 00
>
>
> *euranova.eu *
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen
>
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu *

*research.euranova.eu* 

-- 
♻ Be green, keep it on the screen


Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Konstantin Knauf
Hi Laurent,

With respect to Ververica Platform, we will support Flink 1.12 and add
"upsert-kafka" as a packaged connector in our next minor release which we
target for February.

Cheers,

Konstantin

On Thu, Nov 12, 2020 at 3:43 AM Jark Wu  wrote:

> Hi Laurent,
>
> 1. Deduplicate with keeping the first row will generate an append-only
> stream. But I guess you are expecting to keep the last row which generates
> an updating stream. An alternative way is you can
>  use the "changelog-json" format in this repo [1], it will convert the
> updating stream into append
> records with change flag encoded.
> 2. Yes. It will replace records with the same key, i.e. upsert statement.
> 3. I think it will be available in one or two months. There will be a
> first release candidate soon.
> You can watch on the dev ML. I'm not sure the plan of Ververica
> platform, cc @Konstantin Knauf 
>
> Best,
> Jark
>
> [1]:
> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>
> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hi Jark,
>>
>> thanks for your quick reply. I was indeed expecting it.
>>
>> But that triggers the following questions:
>>
>>1. Is there another way to do this deduplication and generate an
>>append-only stream? Match Recognize? UDF? ...?
>>2. If I would put Postgres as a sink, what would happen? Will the
>>events happen or will they replace the record with the same key?
>>3. When will release-1.12 be available? And when would it be
>>integrated in the Ververica platform?
>>
>> Thanks a lot for your help!
>>
>> Best Regards,
>>
>> Laurent.
>>
>>
>>
>> On Wed, 11 Nov 2020 at 03:31, Jark Wu  wrote:
>>
>>> Hi Laurent,
>>>
>>> This is because the deduplicate node generates an updating stream,
>>> however Kafka currently only supports append-only stream.
>>> This can be addressed in release-1.12, because we introduce a new
>>> connector "upsert-kafka" which supports writing updating
>>>  streams into Kafka compacted topics.
>>>
>>> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
>>> ConsumerRecord#timestamp()?
>>> If yes, this is also supported in release-1.12 via metadata syntax in
>>> DDL [1]:
>>>
>>> CREATE TABLE kafka_table (
>>>   id BIGINT,
>>>   name STRING,
>>>   timestamp BIGINT METADATA,  -- read timestamp
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'test-topic',
>>>   'format' = 'avro'
>>> )
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>>
>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>>> laurent.exste...@euranova.eu> wrote:
>>>
 Hello,

 I'm getting an error  in Flink SQL when reading from kafka,
 deduplicating records and sending them back to Kafka.

 The behavior I want is the following:

 *input:*
 | client_number | address |
 | --- | --- |
 | 1  | addr1 |
 | 1  | addr1 |
 | 1  | addr2 |
 | 1  | addr2 |
 | 1  | addr1 |
 | 1  | addr1 |

 *output:*
 | client_number | address |
 | --- | --- |
 | 1  | addr1 |
 | 1  | addr2 |
 | 1  | addr1 |

 The error seems to say that the type of stream created by the
 deduplication query is of "update & delete" type, while kafka only supports
 append-only:

 Unsupported query
 Table sink 'vvp.default.sat_customers_address' doesn't support
 consuming update and delete changes which is produced by node
 Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
 rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
 $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])


 --> Is there a way to create an append only query from this kind of
 deduplication query (see my code here below)?
 --> Would that work if I would use, say, a Postgres sink?

 Bonus question: can we extract the Kafka ingestion date using Flink
 SQL? (here I generated a processing date to allow ordering during
 deduplication)

 P.S.: I'm on the Ververica Platform, but I guess this error is linked
 to Flink SQL itself.

 Thanks in advance for your help.

 Best Regards,

 Laurent.

 ---
 -- Read from customers kafka topic
 ---
 CREATE TEMPORARY TABLE customers (
 `client_number` INT,
 `name` VARCHAR(100),
 `address` VARCHAR(100)
 )
 COMMENT ''
 WITH (
 'connector' = 'kafka',
 'format' = 'csv',
 'properties.bootstrap.servers' = 

Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Leonard Xu
Hi, Laurent
> 
> I'm not sure that would do what I want though. As far as I understand, the 
> deduplication query will always remember any values it has seen. So if I 
> have, for a specific primary key, the following values in another field: "a", 
> "a", "b", "b", "a", "a", the deduplication query could provide me with "a", 
> "b" as a result. But never with "a", "b", "a" (possibility to come back to a 
> previous value), which is what I need.

From  my understanding, your case is not a pure deduplication case but want to 
both keep the previous record and current record, thus the deduplication query 
can not satisfy your requirement.


> Moreover, I tried putting procttime() DESC, and I get the message: The 
> submitted query is not an append-only query. Only queries producing 
> exclusively new rows over time are supported at the moment. I do want an 
> append only query.
Keeping last row in Deduplication always produces a changelog stream, because 
we need to retract the previous last value and sent the new last value. You 
could use a connector that supports upsert sink like HBase, JDBC or 
upsert-kafka connector when sink a changelog stream, the kafka connector can 
only accept append-only stream and thus you got the message.


> The LAG function makes complete sense to me here, since it would only compare 
> with the previous record. I just don't understand why it does not get the 
> value of the previous record, whatever offset I give it. Any idea what I 
> might be doing wrong?

The LAG function is used in over window aggregation and should work in your 
case, but unfortunately look like the LAG function does not implements 
correctly, I create an issue[1] to fix this.

 
 
Best,
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-20405

> On Fri, 27 Nov 2020 at 03:28, Leonard Xu  > wrote:
> Hi, Laurent
> 
>> Basically, I need to deduplicate, but only keeping in the deduplication 
>> state the latest value of the changed column to compare with. While here it 
>> seems to keep all previous values…
> 
> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it will 
> keep last row, I think that’s what you want.
> 
> BTW, the deduplication has supported event time in 1.12, this will be 
> available soon.
> 
> Best,
> Leonard
> 
> 
> 
> -- 
> Laurent Exsteens
> Data Engineer
> (M) +32 (0) 486 20 48 36
> 
> EURA NOVA
> Rue Emile Francqui, 4
> 1435 Mont-Saint-Guibert
> (T) +32 10 75 02 00 
> 
> euranova.eu 
> research.euranova.eu 
> ♻ Be green, keep it on the screen



Re: Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 赵一旦
失败原因也不写,怎么个不能保存。。。超时?还是啥。

魏积乾  于2020年11月27日周五 下午7:08写道:

> flink-csv-1.11.2.jar
> flink-dist_2.11-1.11.2.jar
> flink-json-1.11.2.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-table_2.11-1.11.2.jar
> flink-table-blink_2.11-1.11.2.jar
> log4j-1.2-api-2.12.1.jar
> log4j-api-2.12.1.jar
> log4j-core-2.12.1.jar
> log4j-slf4j-impl-2.12.1.jar
> flink-metrics-prometheus_2.12-1.11.2.jar
>
> 按时间排了个序,这是最新的包。
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 王默  发送时间: 2020年11月27日 18:41
> 收件人: user-zh  
> 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败
>
>
>
>
>
>
>
>
>
>
> 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-11-27 17:34:39,"魏积乾"  我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
> 希望对你有帮助发自我的iPhone   --
> 原始邮件 -- 发件人: 王默  发送时间: 2020年11月27日 17:22 收件人: user-zh  主题: 回复:带有状态的算子保存checkpoint失败
>
>
>
> 


flink任务运行不久后报netty错误

2020-11-27 Thread 赵一旦
如下报错:
19:59:56.128 [Flink Netty Client (8009) Thread 6] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exce
ption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.214 [Flink Netty Client (8009) Thread 13] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.254 [Flink Netty Client (8009) Thread 15] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.262 [Flink Netty Client (8009) Thread 17] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.270 [Flink Netty Client (8009) Thread 19] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.279 [Flink Netty Client (8009) Thread 21] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.310 [Flink Netty Client (8009) Thread 23] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.320 [Flink Netty Client (8009) Thread 20] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.325 [Flink Netty Client (8009) Thread 22] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
epoll_wait(..) failed: Operation now in progress
19:59:56.339 [Flink Netty Client (8009) Thread 25] WARN
 org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop -
Unexpected exc
eption in the selector loop.


不清楚啥情况。


Re: FlinkSQL kafka->dedup->kafka

2020-11-27 Thread Laurent Exsteens
Hi Leonard,

thank you for your answer.

I'm not sure that would do what I want though. As far as I understand, the
deduplication query will always remember any values it has seen. So if I
have, for a specific primary key, the following values in another field:
"a", "a", "b", "b", "a", "a", the deduplication query could provide me with
"a", "b" as a result. But never with "a", "b", "a" (possibility to come
back to a previous value), which is what I need.
Moreover, I tried putting procttime() DESC, and I get the message: The
submitted query is not an append-only query. Only queries producing
exclusively new rows over time are supported at the moment. I do want an
append only query.

The LAG function makes complete sense to me here, since it would only
compare with the previous record. I just don't understand why it does not
get the value of the previous record, whatever offset I give it. Any idea
what I might be doing wrong?

Thanks in advance.

Regards,

Laurent.

On Fri, 27 Nov 2020 at 03:28, Leonard Xu  wrote:

> Hi, Laurent
>
> Basically, I need to deduplicate, *but only keeping in the deduplication
> state the latest value of the changed column* to compare with. While here
> it seems to keep all previous values…
>
>
> You can use ` ORDER BY proctime() DESC`  in the deduplication query,  it
> will keep last row, I think that’s what you want.
>
> BTW, the deduplication has supported event time in 1.12, this will be
> available soon.
>
> Best,
> Leonard
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu *

*research.euranova.eu* 

-- 
♻ Be green, keep it on the screen


Re: Batch compressed file output

2020-11-27 Thread Matthias Pohl
Hi Flavio,
others might have better ideas to solve this but I'll give it a try: Have
you considered extending FileOutputFormat to achieve what you need? That
approach (which is discussed in [1]) sounds like something you could do.
Another pointer I want to give is the DefaultRollingPolicy [2]. It looks
like it partially does what you're looking for. I'm adding Kostas to this
conversation as he worked on the RollingPolicy. Maybe, he has some more
insights.

I hope that helps.

Best,
Matthias

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/output-writer-td2296.html
[2]
https://github.com/apache/flink/blob/5ff96966b59e0d9a7b55ebae9e252b1c9aafd4ea/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java#L40

On Fri, Nov 27, 2020 at 11:07 AM Flavio Pompermaier 
wrote:

> Hello guys,
> I have to write my batch data (Dataset) to a file format. Actually
> what I need to do is:
>
>1. split the data if it exceeds some size threshold  (by line count or
>max MB)
>2. compress the output data (possibly without converting to the hadoop
>format)
>
> Are there any suggestions / recommendations about that?
>
> Best,
> Flavio
>


Re: Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
感谢回复!
刚才找到问题了,从maven官网拷贝过来的 pom依赖,  scope被设置成 test了。。。改成compile就好了



cljb...@163.com
 
发件人: Jark Wu
发送时间: 2020-11-27 19:14
收件人: user-zh
主题: Re: flink sql cdc 写数据到mysql,找不到相关的类
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?
 
On Fri, 27 Nov 2020 at 19:03, cljb...@163.com  wrote:
 
> 相关的依赖以及添加,不知道如下问题是如何导致,求解!
> 已添加的依赖有:
> flink-connector-mysql-cdc
> flink-format-changelog-json
> flink-json
>
> 报错信息如下:
>
> java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory: Provider
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not
> be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/formats/json/JsonOptions
> at
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 25 common frames omitted
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.formats.json.JsonOptions
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 32 common frames omitted
>
>
>
> cljb...@163.com
>


Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread Jark Wu
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?

On Fri, 27 Nov 2020 at 19:03, cljb...@163.com  wrote:

> 相关的依赖以及添加,不知道如下问题是如何导致,求解!
> 已添加的依赖有:
> flink-connector-mysql-cdc
> flink-format-changelog-json
> flink-json
>
> 报错信息如下:
>
> java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory: Provider
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not
> be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/formats/json/JsonOptions
> at
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 25 common frames omitted
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.formats.json.JsonOptions
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 32 common frames omitted
>
>
>
> cljb...@163.com
>


回复:Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 魏积乾
flink-csv-1.11.2.jar
flink-dist_2.11-1.11.2.jar
flink-json-1.11.2.jar
flink-shaded-zookeeper-3.4.14.jar
flink-table_2.11-1.11.2.jar
flink-table-blink_2.11-1.11.2.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
flink-metrics-prometheus_2.12-1.11.2.jar

按时间排了个序,这是最新的包。



发自我的iPhone


-- 原始邮件 --
发件人: 王默 

flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
相关的依赖以及添加,不知道如下问题是如何导致,求解!
已添加的依赖有:
flink-connector-mysql-cdc
flink-format-changelog-json
flink-json

报错信息如下:

java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: 
Provider com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory 
could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/formats/json/JsonOptions
at 
com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.formats.json.JsonOptions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 32 common frames omitted



cljb...@163.com


Re:FLINK SQL 消费kafka消息乱序问题

2020-11-27 Thread bulterman
检查了一下上游,发现在source端把并行度改成1就不乱序了

















在 2020-11-27 17:44:23,"bulterman" <15618338...@163.com> 写道:
>Hi All,
>kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况  
>用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2


Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默






请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下











在 2020-11-27 17:34:39,"魏积乾"  写道:
>我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
>希望对你有帮助
>
>
>
>发自我的iPhone
>
>
>-- 原始邮件 --
>发件人: 王默 发送时间: 2020年11月27日 17:22
>收件人: user-zh 主题: 回复:带有状态的算子保存checkpoint失败


Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Dongwon Kim
Hi Timo,

Okay, then the aggregate function should look like this:

>  public static class Agg extends AggregateFunction ArrayList> {
> @Override
> public ArrayList createAccumulator() {
>   return new ArrayList<>();
> }
> @Override
> public Integer[] getValue(ArrayList acc) {
>   return acc.toArray(new Integer[0]);
> }
> public void accumulate(ArrayList acc, int i) {
>   acc.add(i);
> }
> @Override
> public TypeInformation getResultType() {
>   return OBJECT_ARRAY(Types.INT);
> }
>   }


Now the program outputs:

> 2> +I([1, 2])


Thanks,

Dongwon

On Fri, Nov 27, 2020 at 5:38 PM Timo Walther  wrote:

> Hi,
>
> first of all we don't support ListTypeInfo in Table API. Therefore, it
> is treated as a RAW type. The exception during exception creation is a
> bug that should be fixed in future version. But the mismatch is valid:
>
> ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
> Can you try this as the result type of your aggregate function.
>
> Reagrds,
> Timo
>
>
> On 26.11.20 18:13, Dongwon Kim wrote:
> > Hello,
> >
> > I'm using Flink-1.11.2.
> >
> > Let's assume that I want to store on a table the result of the following
> > UDAF:
> >
> >public class Agg extends AggregateFunction,
> > List> {
> >  @Override
> >  public List createAccumulator() {
> >return new LinkedList<>();
> >  }
> >  @Override
> >  public List getValue(List acc) {
> >return acc;
> >  }
> >  public void accumulate(List acc, int i) {
> >acc.add(i);
> >  }
> >  @Override
> >  public TypeInformation> getResultType() {
> >return new ListTypeInfo<>(Integer.class);
> >  }
> >}
> >
> >
> > The main program looks as follow:
> >
> > public class TestMain {
> >public static void main(String[] args) {
> >  EnvironmentSettings settings = EnvironmentSettings.newInstance()
> >.inBatchMode()
> >.build();
> >  TableEnvironment tEnv = TableEnvironment.create(settings);
> >  tEnv.executeSql(
> >"CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() +
> "'"
> >  );
> >  Table t = tEnv.sqlQuery(
> >"SELECT agg(c2)\n" +
> >  "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
> >  "GROUP BY c1"
> >  );
> >  tEnv.executeSql(
> >"CREATE TABLE output (a ARRAY) WITH ('connector' =
> 'print')"
> >  );
> >  /**
> >   * root
> >   *  |-- EXPR$0: RAW('java.util.List', ?)
> >   */
> >  t.printSchema();
> >  t.executeInsert("output" );
> >}
> > }
> >
> >
> > This program fails with the following exception:
> >
> > Exception in thread "main"
> > org.apache.flink.table.api.TableException: A raw type backed by type
> > information has no serializable string representation. It needs to
> > be resolved into a proper raw type.
> > at
> >
>  
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
> > at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> > at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> > at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
>  
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
> > at scala.Option.map(Option.scala:146)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
>  
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
>  
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
>  
> 

Batch compressed file output

2020-11-27 Thread Flavio Pompermaier
Hello guys,
I have to write my batch data (Dataset) to a file format. Actually
what I need to do is:

   1. split the data if it exceeds some size threshold  (by line count or
   max MB)
   2. compress the output data (possibly without converting to the hadoop
   format)

Are there any suggestions / recommendations about that?

Best,
Flavio


FLINK SQL 消费kafka消息乱序问题

2020-11-27 Thread bulterman
Hi All,
kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况  
用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2

flink实时写入Hbase丢数据问题

2020-11-27 Thread bradyMk
大家好,最近有项目要把数据写入hbase,本人采用的是hbase api
中的BufferedMutator.flush的方法,每500条数据flush一下,但是发现这种方法偶尔会有十几行写入失败,这种情况下,这几十行数据就会丢失,请问大家有什么建议么?
该用什么方法实时写入hbase,怎么保证数据不会有丢失的情况呢?谢谢大家~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默












感谢你提供的思路,配置文件已设置了state.checkpoints.dir,我检查一下是否有jar未升级





在 2020-11-27 17:34:39,"魏积乾"  写道:
>我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
>希望对你有帮助
>
>
>
>发自我的iPhone
>
>
>-- 原始邮件 --
>发件人: 王默 发送时间: 2020年11月27日 17:22
>收件人: user-zh 主题: 回复:带有状态的算子保存checkpoint失败


带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默
Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2


web上checkpoint失败的截图,图片链接:https://imgchr.com/i/Dr3PNn
使用的是去掉业务逻辑后的简单测试代码


测试代码部分:
public class TestStateProcess extends KeyedProcessFunction {
private transient ValueState userCount;


@Override
public void open(Configuration parameters) throws Exception {
try {
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("userId", TypeInformation.of(new 
TypeHint() {}));


StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);


userCount = getRuntimeContext().getState(descriptor);
} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void processElement(NLMessage value, Context ctx, 
Collector out) throws Exception {
try {
if (null == userCount.value()) {
userCount.update(1);


} else {
userCount.update(userCount.value() + 1);
}


if (userCount.value() > 10) {
System.out.println(new Date() + " userId: " + 
ctx.getCurrentKey() + " count: " + userCount.value());
}


out.collect(value);
} catch (IOException e) {
e.printStackTrace();
}
}
}


checkpoint配置:
env.setStateBackend(new 
MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false));
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


万分感谢!




 

回复:带有状态的算子保存checkpoint失败

2020-11-27 Thread 魏积乾
我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
希望对你有帮助



发自我的iPhone


-- 原始邮件 --
发件人: 王默 

带有状态的算子保存checkpoint失败

2020-11-27 Thread 王默
Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2


附件为web上checkpoint失败的截图,使用的是去掉业务逻辑后的简单测试代码


测试代码部分:
public class TestStateProcess extends KeyedProcessFunction {
private transient ValueState userCount;


@Override
public void open(Configuration parameters) throws Exception {
try {
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("userId", TypeInformation.of(new 
TypeHint() {}));


StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);


userCount = getRuntimeContext().getState(descriptor);
} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void processElement(NLMessage value, Context ctx, 
Collector out) throws Exception {
try {
if (null == userCount.value()) {
userCount.update(1);


} else {
userCount.update(userCount.value() + 1);
}


if (userCount.value() > 10) {
System.out.println(new Date() + " userId: " + 
ctx.getCurrentKey() + " count: " + userCount.value());
}


out.collect(value);
} catch (IOException e) {
e.printStackTrace();
}
}
}


checkpoint配置:
env.setStateBackend(new 
MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false));
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


万分感谢!

Re: Duplication error on Kafka Connector Libraries

2020-11-27 Thread Arvid Heise
The most common cause of such issues is usually class loading.

You probably have added the flink-connector-kafka also to flink-dist/libs.
But the connector is only meant to be bundled with your job jar afaik.
Right now, you have the Kafka classes loaded in the user code classloader
and in the system classloader of Flink itself.

On Fri, Nov 27, 2020 at 12:24 AM Kevin Kwon  wrote:

> Hi community, I'm testing out 1.12-SNAPSHOT in master branch
>
> I built my application with library 'flink-connector-kafka' but when I
> start the app, I get
>
> Caused by: org.apache.kafka.common.KafkaException: class
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of org.apache.kafka.common.serialization.Deserializer
>
> while constructing KafkaConsumer class. Is this normal behavior?
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: PyFlink Table API and UDF Limitations

2020-11-27 Thread Niklas Wilcke
Hi Xingbo,

thanks for sharing. This is very interesting.

Regards,
Niklas

> On 27. Nov 2020, at 03:05, Xingbo Huang  wrote:
> 
> Hi Niklas,
> 
> Thanks a lot for supporting PyFlink. In fact, your requirement for multiple 
> input and multiple output is essentially Table Aggregation Functions[1]. 
> Although PyFlink does not support it yet, we have listed it in the release 
> 1.13 plan. In addition, row-based operations[2] that are very user-friendly 
> to machine learning users are also included in the 1.13 plan.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>  
> 
> 
> Best,
> Xingbo
> 
> Niklas Wilcke mailto:niklas.wil...@uniberg.com>> 
> 于2020年11月26日周四 下午5:11写道:
> Hi Xingbo,
> 
> thanks for taking care and letting me know. I was about to share an example, 
> how to reproduce this.
> Now I will wait for the next release candidate and give it a try.
> 
> Regards,
> Niklas
> 
> 
> --
> niklas.wil...@uniberg.com 
> Mobile: +49 160 9793 2593
> Office: +49 40 2380 6523
> 
> Simon-von-Utrecht-Straße 85a
> 20359 Hamburg
> 
> UNIBERG GmbH
> Registergericht: Amtsgericht Kiel HRB SE-1507
> Geschäftsführer: Andreas Möller, Martin Ulbricht
> 
> Information Art. 13 DSGVO B2B:
> Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen 
> Daten.
> Alle Informationen zum Umgang mit Ihren Daten finden Sie unter 
> https://www.uniberg.com/impressum.html 
> . 
> 
>> On 26. Nov 2020, at 02:59, Xingbo Huang > > wrote:
>> 
>> Hi Niklas,
>> 
>> Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" 
>> java.lang.NoClassDefFoundError: 
>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`, 
>> it does not affect the correctness of the result. The reason is that some 
>> resources are released asynchronously when Grpc Server is shut down[1] . 
>> After the UserClassLoader unloads the class, the asynchronous thread tries 
>> to release the resources and throw NotClassFoundException, but the content 
>> of the data result has been sent downstream, so the correctness of the 
>> result will not be affected.
>> 
>> Regarding the details of the specific causes, I have explained in the flink 
>> community[2] and the beam community[3], and fixed them in the flink 
>> community. There will be no such problem in the next version of release 
>> 1.11.3 and 1.12.0.
>> 
>> [1] 
>> https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
>>  
>> 
>> [2] https://issues.apache.org/jira/browse/FLINK-20284 
>> 
>> [3] https://issues.apache.org/jira/browse/BEAM-5397 
>> 
>> 
>> Best,
>> Xingbo
>> 
>> 
>> Dian Fu mailto:dian0511...@gmail.com>> 
>> 于2020年11月16日周一 下午9:10写道:
>> Hi Niklas,
>> 
>>> How can I ingest data in a batch table from Kafka or even better 
>>> Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch 
>>> isn't offering a source at all.
>>> The only workaround which comes to my mind is to use the Kafka streaming 
>>> source and to apply a single very large window to create a bounded table. 
>>> Do you think that would work?
>>> Are there other options available? Maybe converting a Stream to a bounded 
>>> table is somehow possible? Thank you!
>> 
>> 
>> I think you are right that Kafka still doesn't support batch and there is no 
>> ES source for now. Another option is you could load the data into a 
>> connector which supports batch. Not sure if anybody else has a better idea 
>> about this.
>> 
>>> I found one cause of this problem and it was mixing a Scala 2.12 Flink 
>>> installation with PyFlink, which has some 2.11 jars in its opt folder. I 
>>> think the JVM just skipped the class definitions, because they weren't 
>>> compatible. I actually wasn't aware of the fact that PyFlink comes with 
>>> prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it 
>>> would make sense to point that out in the documentation. I think I never 
>>> read that and it might be missing. Unfortunately there is still one 
>>> Exception showing up at the very end of the job in the taskmanager log. I 
>>> did the verification you asked for and the class is present in both jar 
>>> files. The one which comes with Flink in the opt folder and the one of 
>>> PyFlink. You can find the log 

Re: Caching

2020-11-27 Thread Dongwon Kim
Hi Navneeth,

I didn't quite understand how async io can be used here. It would be great
> if you can share some info on it.

You need to add an async operator in the middle of your pipeline in order
to enrich your input data. [1] and [2] will help you.

Also how are you propagating any changes to values?

I need to maintain the mapping of road ID to various attributes of each
road, and the mapping is updated every week.
I use keys for versioning and I use Hash [3] for value in order to store a
mapping.
When a new mapping is prepared I'm uploading it using a fresh key while the
previous version is being served to Flink (via async io).
Such concurrent read/write is possible in Redis when you turn off
transaction when creating Redis client's pipeline.
When the new mapping is completely uploaded, I inform my Flink pipeline of
the new mapping via Kafka.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
[2] https://www.youtube.com/watch?v=UParyxe-2Wc
[3] https://redis.io/topics/data-types#hashes
[4] https://github.com/andymccurdy/redis-py#pipelines

Best,

Dongwon

On Fri, Nov 27, 2020 at 4:31 PM Navneeth Krishnan 
wrote:

> Thanks Dongwon. It was extremely helpful. I didn't quite understand how
> async io can be used here. It would be great if you can share some info on
> it.
>
> Also how are you propagating any changes to values?
>
> Regards,
> Navneeth
>
> On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim  wrote:
>
>> Oops, I forgot to mention that when doing bulk insert into Redis, you'd
>> better open a pipeline with a 'transaction' property set to False [1].
>>
>> Otherwise, API calls from your Flink job will be timeout.
>>
>> [1] https://github.com/andymccurdy/redis-py#pipelines
>>
>> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Navneeth,
>>>
>>> I reported a similar issue to yours before [1] but I took the
>>> broadcasting approach at first.
>>>
>>> As you already anticipated, broadcasting is going to use more memory
>>> than your current approach based on a static object on each TM .
>>>
>>> And the broadcasted data will be treated as operator state and will be
>>> periodically checkpointed with serialization overhead & garbage collections.
>>> These are not negligible at all if you're not carefully choosing
>>> serialization strategy as explained in [2].
>>> Even with the proper one, I've experienced mild back pressure whenever
>>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing
>>> to do with operator states)
>>> - cache is being broadcasted
>>>
>>> For that reason, I decided to populate data on Redis but it also calls
>>> for design decisions:
>>> - which Java client to use? Jedis [3]? Lettuce [4]?
>>> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>>>
>>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
>>> very small memory footprint and without worrying about serialization
>>> overhead and garbage collections.
>>> Lettuce supports asynchronous communication so it works perfectly with
>>> Flink's async io.
>>> I bet you'll be very disappointed with invoking Jedis synchronously
>>> inside ProcessFunction.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
>>> [2]
>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>> [3] https://github.com/redis/jedis
>>> [4] https://lettuce.io/
>>> [5]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>>
>>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
>>> reachnavnee...@gmail.com> wrote:
>>>
 Hi All,

 We have a flink streaming job processing around 200k events per second.
 The job requires a lot of less frequently changing data (sort of static but
 there will be some changes over time, say 5% change once per day or so).
 There are about 12 caches with some containing approximately 20k
 entries whereas a few with about 2 million entries.

 In the current implementation we are using in-memory lazy loading
 static cache to populate the data and the initialization happens in open
 function. The reason to choose this approach is because we have allocated
 around 4GB extra memory per TM for these caches and if a TM has 6 slots the
 cache can be shared.

 Now the issue we have with this approach is everytime when a container
 is restarted or a new job is deployed it has to populate the cache again.
 Sometimes this lazy loading takes a while and it causes back pressure as
 well. We were thinking to move this logic to the broadcast stream but since
 the data has to be stored per slot it would increase the memory consumption
 by a lot.

 Another option that we were thinking of is to replace the current near
 far cache that uses rest api to 

Re: 执行mvn构建错误

2020-11-27 Thread renjiyun
这应该是生成的代码,删除后重新打包



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkKafkaProducer好像每次事务提交都会重连producer导致打印了大量log

2020-11-27 Thread Wz


下面是addSink的代码:
result.addSink(new FlinkKafkaProducer(DataSourceConfig.ResultTopic,new
MyKafkaSerializationSchema(DataSourceConfig.ResultTopic),ConnectToKafka.getKafKaProducerProperties(),FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
3)).setParallelism(1);

KafkaProducer配置信息:
props_Producer.put("bootstrap.servers",
DataSourceConfig.bootstrapServersIPAddress);
props_Producer.put("acks","all");
props_Producer.put("request.timeout.ms", 3000);


总之我也不太清楚为什么会反复打印下面的链接时才会打印的log,推测是一直在重新连接,几乎无间断的打印下面的log给磁盘撑爆了。请教各位大佬可能的原因?


2020-11-20 15:55:56,672 INFO 
org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer
clientId=producer-CepOperator -> Sink:
Unnamed-1848139bf30d999062379bb9e1d14fd8-2, transactionalId=CepOperator ->
Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-2] Instantiated a
transactional producer.
2020-11-20 15:55:56,672 INFO 
org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer
clientId=producer-CepOperator -> Sink:
Unnamed-1848139bf30d999062379bb9e1d14fd8-2, transactionalId=CepOperator ->
Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-2] Overriding the default
retries config to the recommended value of 2147483647 since the idempotent
producer is enabled.
2020-11-20 15:55:56,676 INFO  org.apache.kafka.common.utils.AppInfoParser   
  
[] - Kafka version: 2.4.1
2020-11-20 15:55:56,676 INFO  org.apache.kafka.common.utils.AppInfoParser   
  
[] - Kafka commitId: c57222ae8cd7866b
2020-11-20 15:55:56,676 INFO  org.apache.kafka.common.utils.AppInfoParser   
  
[] - Kafka startTimeMs: 1605858956676
2020-11-20 15:55:56,676 INFO 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Starting
FlinkKafkaInternalProducer (1/1) to produce into default topic
spc_testResult
2020-11-20 15:55:56,676 INFO 
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-CepOperator -> Sink:
Unnamed-1848139bf30d999062379bb9e1d14fd8-2, transactionalId=CepOperator ->
Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-2] ProducerId set to -1 with
epoch -1
2020-11-20 15:55:56,678 INFO  org.apache.kafka.clients.Metadata 
  
[] - [Producer clientId=producer-CepOperator -> Sink:
Unnamed-1848139bf30d999062379bb9e1d14fd8-2, transactionalId=CepOperator ->
Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-2] Cluster ID:
8IUUMEvGQLKWsQRfKWc9Hw
2020-11-20 15:55:56,779 INFO 
org.apache.kafka.clients.producer.internals.TransactionManager [] -
[Producer clientId=producer-CepOperator -> Sink:
Unnamed-1848139bf30d999062379bb9e1d14fd8-2, transactionalId=CepOperator ->
Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-2] ProducerId set to 50001
with epoch 1753
2020-11-20 15:55:56,793 INFO 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction []
- FlinkKafkaProducer 1/1 - checkpoint 5383 complete, committing transaction
TransactionHolder{handle=KafkaTransactionState [transactionalId=CepOperator
-> Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-0, producerId=50002,
epoch=1752], transactionStartTime=1605858953780} from checkpoint 5383
2020-11-20 15:55:56,793 INFO 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
[] - Flushing new partitions
2020-11-20 15:55:56,793 INFO 
org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer
clientId=producer-CepOperator -> Sink:
Unnamed-1848139bf30d999062379bb9e1d14fd8-0, transactionalId=CepOperator ->
Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-0] Closing the Kafka producer
with timeoutMillis = 0 ms.
2020-11-20 15:55:56,793 INFO 
org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer
clientId=producer-CepOperator -> Sink:
Unnamed-1848139bf30d999062379bb9e1d14fd8-0, transactionalId=CepOperator ->
Sink: Unnamed-1848139bf30d999062379bb9e1d14fd8-0] Proceeding to force close
the producer since pending requests could not be completed within timeout 0
ms.
2020-11-20 15:55:59,670 INFO 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
[] - Flushing new partitions
2020-11-20 15:55:59,671 INFO 
org.apache.kafka.clients.producer.ProducerConfig [] -
ProducerConfig values: 
acks = all
batch.size = 16384
bootstrap.servers = [192.168.81.128:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = 
compression.type = none
connections.max.idle.ms = 54
delivery.timeout.ms = 12
enable.idempotence = false
interceptor.classes = []
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Timo Walther

Hi,

first of all we don't support ListTypeInfo in Table API. Therefore, it 
is treated as a RAW type. The exception during exception creation is a 
bug that should be fixed in future version. But the mismatch is valid:


ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. 
Can you try this as the result type of your aggregate function.


Reagrds,
Timo


On 26.11.20 18:13, Dongwon Kim wrote:

Hello,

I'm using Flink-1.11.2.

Let's assume that I want to store on a table the result of the following 
UDAF:


   public class Agg extends AggregateFunction,
List> {
     @Override
     public List createAccumulator() {
       return new LinkedList<>();
     }
     @Override
     public List getValue(List acc) {
       return acc;
     }
     public void accumulate(List acc, int i) {
       acc.add(i);
     }
     @Override
     public TypeInformation> getResultType() {
       return new ListTypeInfo<>(Integer.class);
     }
   }


The main program looks as follow:

public class TestMain {
   public static void main(String[] args) {
     EnvironmentSettings settings = EnvironmentSettings.newInstance()
       .inBatchMode()
       .build();
     TableEnvironment tEnv = TableEnvironment.create(settings);
     tEnv.executeSql(
       "CREATE TEMPORARY FUNCTION agg AS '" + Agg.class.getName() + "'"
     );
     Table t = tEnv.sqlQuery(
       "SELECT agg(c2)\n" +
         "FROM (VALUES (ROW('a',1)), (ROW('a',2))) AS T(c1,c2)\n" +
         "GROUP BY c1"
     );
     tEnv.executeSql(
       "CREATE TABLE output (a ARRAY) WITH ('connector' = 'print')"
     );
     /**
      * root
      *  |-- EXPR$0: RAW('java.util.List', ?)
      */
     t.printSchema();
     t.executeInsert("output" );
   }
}


This program fails with the following exception:

Exception in thread "main"
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to
be resolved into a proper raw type.
at

org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at

org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at

org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at

org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
at scala.Option.map(Option.scala:146)
at

org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at

org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at

scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at

org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at

org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at

org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
at

org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
at my.TestMain.main(TestMain.java:62)


I found that two 

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-27 Thread jindy_liu
谢谢jark!这几天一直在做性能调优!
1、这里针对这个简单场景目前可以在sink表的test_status表的primary key,增加一个join
key。即id和status两个列作为key,这样能使用数据最终一致,算是做了下规避,能一致。复杂点的语句感觉有点难搞,有点不敢用,主要不清楚这个乱序会对其它算子有什么影响,很容易出错,确实应该在flink框架里搞了合适些。这里jark在使用flink
sql cdc方面有啥建议吗?

2、关于性能这块,确实flink的rocksdb默认参数,性能很差!
按你给的文章,调了些参数,同时换了ssd硬盘后,write_buffer,buffter_size,能有很好的提升。我说之前怎么并行度提高了,cpu感觉总是跑不满,在等io了。感觉这里提升空间还有很大,还没摸到窍门,哪个参数会比较好。

3、另外,性能监控方面,flink的web
ui上的metric有点难用,有没有一些prometheus+grafana的最佳实践的?指标有点多,dashboard搞起来很麻烦,
主要是要有dashboard的配置!




--
Sent from: http://apache-flink.147419.n8.nabble.com/