Re: Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 Thread lxk7...@163.com

刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
我理解   
是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
这样的话,大部分场景其实都适合使用map-state。


lxk7...@163.com
 
From: jurluo(罗凯)
Date: 2022-05-25 11:05
To: user-zh@flink.apache.org
Subject: Re: [Internet]Re: Re: Some question with Flink state
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。
 
> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
> 
> 
> 二.使用String类型  代码如下:
> 
> 
> 
> public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
> new data("1", "123", "分类页"),
> 
> 
> 
> new data("2", "r-123", "搜索结果页"),
> 
> 
> 
> new data("2", "r-123", "我的页"),
> 
> 
> 
> new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 

Re: Kafka source 检测到分区变更时发生 WakeupException

2022-05-24 Thread Qingsheng Ren
Hi,

感谢反馈,看上去是一个 bug。可以在 Apache JIRA [1] 上新建一个 ticket 吗?

[1] https://issues.apache.org/jira

> On May 25, 2022, at 11:35, 邹璨  wrote:
> 
> flink版本: 1.14.3
> 模块:connectors/kafka 
> 问题描述:
> 我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下
> 
> 2022-05-10 15:08:03
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: org.apache.kafka.common.errors.WakeupException
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
> at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
> at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> 
> 
> 
> 根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。
> 随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。
> 
> 
> 
> 
> 
> 
> 此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
> 如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
> 如果此电子邮件发送不正确,请立即联系 NAVER 
> Security(dl_naversecur...@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。
> ​
> This email and the information contained in this email are intended solely 
> for the recipient(s) addressed above and may contain information that is 
> confidential and/or privileged or whose disclosure is prohibited by law or 
> other reasons.
> If you are not the intended recipient of this email, please be advised that 
> any unauthorized storage, duplication, dissemination, distribution or 
> disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER 
> Security (dl_naversecur...@navercorp.com) and delete this email and any 
> copies and attachments from your system. Thank you for your cooperation.​



Re: Source vs SourceFunction and testing

2022-05-24 Thread Qingsheng Ren
Hi Piotr,

I’d like to share my understanding about this. Source and SourceFunction are 
both interfaces to data sources. SourceFunction was designed and introduced 
earlier and as the project evolved, many shortcomings emerged. Therefore, the 
community re-designed the source interface and introduced the new Source API in 
FLIP-27 [1]. 

Finally we will deprecate the SourceFunction and use Source as the only 
interface for all data sources, but considering the huge cost of migration 
you’ll see SourceFunction and Source co-exist for some time, like the 
ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as 
a pioneer has already migrated to the new Source API.

I think the API to end users didn't change a lot: both 
env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
and you could apply downstream transformations onto it. 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 

Cheers,

Qingsheng

> On May 25, 2022, at 03:19, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> 
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  
> wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski



Kafka source 检测到分区变更时发生 WakeupException

2022-05-24 Thread 邹璨
flink版本: 1.14.3
模块:connectors/kafka 
问题描述:
我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下

2022-05-10 15:08:03
java.lang.RuntimeException: One or more fetchers have encountered exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.common.errors.WakeupException
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more



根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。
随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。






此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
如果此电子邮件发送不正确,请立即联系 NAVER 
Security(dl_naversecur...@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。
​
This email and the information contained in this email are intended solely for 
the recipient(s) addressed above and may contain information that is 
confidential and/or privileged or whose disclosure is prohibited by law or 
other reasons.
If you are not the intended recipient of this email, please be advised that any 
unauthorized storage, duplication, dissemination, distribution or disclosure of 
all or part of this email is strictly prohibited.
If you received this email in error, please immediately contact NAVER Security 
(dl_naversecur...@navercorp.com) and delete this email and any copies and 
attachments from your system. Thank you for your cooperation.​


Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 Thread 罗凯
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。

> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
> 
> 
> 二.使用String类型  代码如下:
> 
> 
> 
> public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
> new data("1", "123", "分类页"),
> 
> 
> 
> new data("2", "r-123", "搜索结果页"),
> 
> 
> 
> new data("2", "r-123", "我的页"),
> 
> 
> 
> new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private String id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(String id, String goods, String pageName) {
> 
> 
> 
> this.id = 

Re: accuracy validation of streaming pipeline

2022-05-24 Thread Leonard Xu
Hi, vtygoss

> I'm working on migrating from full-data-pipeline(with spark) to 
> incremental-data-pipeline(with flink cdc), and i met a problem about accuracy 
> validation between pipeline based flink and spark.

Glad to hear that !



> For bounded data, it's simple to validate the two result sets are consitent 
> or not. 
> But, for unbouned data and event-driven application, how to make sure the 
> data stream produced is correct, especially when there are some retract 
> functions with high impactions, e.g. row_number. 
> 
> Is there any document for this preblom?  Thanks for your any suggestions or 
> replies. 

The validation feature belongs data quality scope from my understanding, it’s 
usually provided by the platform e.g. the Data Integration Platform. As the 
underlying pipeline engine/tools, Flink CDC should expose more metrics or data 
quality checking abilities but we didn’t offers them yet, and these 
enhancements is on our roadmap.  Currently, you can use Flink source/sink 
operator’s metric as a rough validation, you can also compare the records count 
in your source database and sink system multiple times for more accurate 
validation.

Best,
Leonard



Re: Re: Some question with Flink state

2022-05-24 Thread lxk7...@163.com
图片好像又挂了  我重发下
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准



   下面是我的代码及测试结果



    一.使用int类型



   public class KeyByTest {



    public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



    env.setParallelism(10);





    DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),



    new data(1, "123", "分类页"),



    new data(2, "r-123", "搜索结果页"),



    new data(1, "r-123", "我的页"),



    new data(3, "r-4567", "搜索结果页")));











    SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())



    .map(new RichMapFunction() {





    @Override



    public String map(data data) throws Exception {



    System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );



    return data.toString();



    }



    });







    env.execute("test");





    }



}



class data{



    private int id;



    private String goods;



    private String pageName;





    public data(int id, String goods, String pageName) {



    this.id = id;



    this.goods = goods;



    this.pageName = pageName;



    }







    public data() {



    }





    public int getId() {



    return id;



    }





    public void setId(int id) {



    this.id = id;



    }





    public String getGoods() {



    return goods;



    }





    public void setGoods(String goods) {



    this.goods = goods;



    }





    public String getPageName() {



    return pageName;



    }





    public void setPageName(String pageName) {



    this.pageName = pageName;



    }





    @Override



    public String toString() {



    return "data{" +



    "id='" + id + '\'' +



    ", goods='" + goods + '\'' +



    ", pageName='" + pageName + '\'' +



    '}';



    }



}





class MyKeySelector implements KeySelector{





    @Override



    public Integer getKey(data data) throws Exception {



    return data.getId();



    }



}



控制台的输出如下:
https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png



可以看见数据根据id分组,分到了不同的subtask上。







二.使用String类型  代码如下:



public class KeyByTest {



    public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



    env.setParallelism(10);





    DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),



    new data("1", "123", "分类页"),



    new data("2", "r-123", "搜索结果页"),



    new data("2", "r-123", "我的页"),



    new data("3", "r-4567", "搜索结果页")));











    SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())



    .map(new RichMapFunction() {





    @Override



    public String map(data data) throws Exception {



    System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );



    return data.toString();



    }



    });







    env.execute("test");





    }



}



class data{



    private String id;



    private String goods;



    private String pageName;





    public data(String id, String goods, String pageName) {



    this.id = id;



    this.goods = goods;



    this.pageName = pageName;



    }







    public data() {



    }





    public String getId() {



    return id;



    }





    public void setId(String id) {



    this.id = id;



    }





    public String getGoods() {



    return goods;



    }





    public void setGoods(String goods) {



    this.goods = goods;



    }





    public String getPageName() {



    return pageName;



    }





    public void setPageName(String pageName) {



    this.pageName = pageName;



    }





    @Override



    public String toString() {



    return "data{" +



    "id='" + id + '\'' +



    ", goods='" + goods + '\'' +



    ", pageName='" + pageName + '\'' +



    '}';



    }



}





class MyKeySelector implements KeySelector{





    @Override



    public String getKey(data data) throws Exception {



    return data.getId();



    }



}



最终控制台输出如下:


https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png




可以看见只分了两个组,我不清楚这是否是一个bug.







lxk7...@163.com



 



From: Xuyang



Date: 2022-05-24 21:35

Re: Re: Some question with Flink state

2022-05-24 Thread lxk7...@163.com
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
   下面是我的代码及测试结果
一.使用int类型
   public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
new data(1, "123", "分类页"),
new data(2, "r-123", "搜索结果页"),
new data(1, "r-123", "我的页"),
new data(3, "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private int id;
private String goods;
private String pageName;

public data(int id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public Integer getKey(data data) throws Exception {
return data.getId();
}
}
控制台的输出如下:
可以看见数据根据id分组,分到了不同的subtask上。


二.使用String类型  代码如下:
public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
new data("1", "123", "分类页"),
new data("2", "r-123", "搜索结果页"),
new data("2", "r-123", "我的页"),
new data("3", "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private String id;
private String goods;
private String pageName;

public data(String id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public String getKey(data data) throws Exception {
return data.getId();
}
}
最终控制台输出如下:


可以看见只分了两个组,我不清楚这是否是一个bug.


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 21:35
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>>

Re: accuracy validation of streaming pipeline

2022-05-24 Thread Shengkai Fang
Hi, all.

>From my understanding, the accuracy for the sync pipeline requires to
snapshot the source and sink at some points.  It is just like we have a
checkpoint that contains all the data at some time for both sink and
source. Then we can compare the content in the checkpoint and find the
difference.

The main problem is how can we snapshot the data in the source/sink or
provide some meaningful metrics to compare at the points.

Best,
Shengkai

Xuyang  于2022年5月24日周二 21:32写道:

> I think for an unbounded data, we can only check the result at one point
> of time, that is the work what Watermark[1] does. What about tag one time
> and to validate the data accuracy at that moment?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#watermark
>
> 在 2022-05-20 16:02:39,"vtygoss"  写道:
>
> Hi community!
>
>
> I'm working on migrating from full-data-pipeline(with spark) to
> incremental-data-pipeline(with flink cdc), and i met a problem about
> accuracy validation between pipeline based flink and spark.
>
>
> For bounded data, it's simple to validate the two result sets are
> consitent or not.
>
> But, for unbouned data and event-driven application, how to make sure the
> data stream produced is correct, especially when there are some retract
> functions with high impactions, e.g. row_number.
>
>
> Is there any document for this preblom?  Thanks for your any suggestions
> or replies.
>
>
> Best Regards!
>
>


Re: Application mode deployment through API call

2022-05-24 Thread Shengkai Fang
Hi, Peter.

I am not sure whether this doc is enough or not. The doc[1] lists all the
available REST API in the Flink runtime now. You can use the RestClient[2]
to send request to the JM for later usage.

Best,
Shengkai

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
[2]
https://github.com/apache/flink/blob/646ff2d36f40704f5dca017b8fffed78bd51b307/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

Peter Schrott  于2022年5月24日周二 19:52写道:

> Hi Vikash,
>
> Could you be more precise about the shared libraries? Is there any
> documentation about this?
>
> Thanks, Peter
>
> On Tue, May 24, 2022 at 1:23 PM Vikash Dat  wrote:
>
>> Similar to agent Biao, Application mode is okay if you only have a single
>> app, but when running multiple apps session mode is better for control. In
>> my experience, the CLIFrontend is not as robust as the REST API, or you
>> will end up having to rebuild a very similar Rest API. For the meta space
>> issue, have you tried adding shared libraries to the flink lib folder?
>>
>> On Mon, May 23, 2022 at 23:31 Shengkai Fang  wrote:
>>
>>> Hi, all.
>>>
>>> > is there any plan in the Flink community to provide an easier way of
>>> deploying Flink with application mode on YARN
>>>
>>> Yes. Jark has already opened a ticket about how to use the sql client to
>>> submit the SQL in application mode[1]. What's more, in FLIP-222 we are able
>>> to manage the jobs in SQL, which will list all submitted jobs and their web
>>> UI[2].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-26541
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+query+lifecycle+statements+in+SQL+client
>>>
>>>
>>>


Re:Re:Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-24 Thread sjf0115
好的 谢谢
在 2022-05-24 21:23:56,"Xuyang"  写道:
>Hi, 
>我debug到了代码里,似乎是个bug。如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink
> Planner里,只看有没有定义emitValue。你可以去Flink issue上提一下这个bug
>在 2022-05-23 18:24:17,"sjf0115"  写道:
>>Flink 版本:1.13.5
>>
>>
>>
>>
>>函数完整代码如下:
>>```
>>public class Top2RetractTableAggregateFunction extends 
>>TableAggregateFunction, 
>>Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
>>private static final Logger LOG = 
>> LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
>>// Top2 聚合中间结果数据结构
>>public static class Top2RetractAccumulator {
>>public long beforeFirst = 0;
>>public long beforeSecond = 0;
>>public long afterFirst = 0;
>>public long afterSecond = 0;
>>}
>>
>>
>>// 创建 Top2Accumulator 累加器并做初始化
>>@Override
>>public Top2RetractAccumulator createAccumulator() {
>>LOG.info("[INFO] createAccumulator ...");
>>Top2RetractAccumulator acc = new Top2RetractAccumulator();
>>acc.beforeFirst = Integer.MIN_VALUE;
>>acc.beforeSecond = Integer.MIN_VALUE;
>>acc.afterFirst = Integer.MIN_VALUE;
>>acc.afterSecond = Integer.MIN_VALUE;
>>return acc;
>>}
>>
>>
>>// 接收输入元素并累加到 Accumulator 数据结构
>>public void accumulate(Top2RetractAccumulator acc, Long value) {
>>LOG.info("[INFO] accumulate ...");
>>if (value > acc.afterFirst) {
>>acc.afterSecond = acc.afterFirst;
>>acc.afterFirst = value;
>>} else if (value > acc.afterSecond) {
>>acc.afterSecond = value;
>>}
>>}
>>
>>
>>// 带撤回的输出
>>public void emitUpdateWithRetract(Top2RetractAccumulator acc, 
>> RetractableCollector> out) {
>>LOG.info("[INFO] emitUpdateWithRetract ...");
>>if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>>// 撤回旧记录
>>if (acc.beforeFirst != Integer.MIN_VALUE) {
>>out.retract(Tuple2.of(acc.beforeFirst, 1));
>>}
>>// 输出新记录
>>out.collect(Tuple2.of(acc.afterFirst, 1));
>>acc.beforeFirst = acc.afterFirst;
>>}
>>if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>>// 撤回旧记录
>>if (acc.beforeSecond != Integer.MIN_VALUE) {
>>out.retract(Tuple2.of(acc.beforeSecond, 2));
>>}
>>// 输出新记录
>>out.collect(Tuple2.of(acc.afterSecond, 2));
>>acc.beforeSecond = acc.afterSecond;
>>}
>>}
>>}
>>```
>>调用完整代码如下:
>>```
>>// 执行环境
>>StreamExecutionEnvironment env = 
>>StreamExecutionEnvironment.getExecutionEnvironment();
>>env.setParallelism(1);
>>EnvironmentSettings settings = EnvironmentSettings
>>.newInstance()
>>.useOldPlanner() // Blink Planner 异常 Old Planner 可以
>>.inStreamingMode()
>>.build();
>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>
>>
>>DataStream sourceStream = env.fromElements(
>>Row.of("李雷", "语文", 78),
>>Row.of("韩梅梅", "语文", 50),
>>Row.of("李雷", "语文", 99),
>>Row.of("韩梅梅", "语文", 80),
>>Row.of("李雷", "英语", 90),
>>Row.of("韩梅梅", "英语", 40),
>>Row.of("李雷", "英语", 98),
>>Row.of("韩梅梅", "英语", 88)
>>);
>>
>>
>>// 注册虚拟表
>>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), 
>>$("score"));
>>// 注册临时i系统函数
>>tEnv.createTemporarySystemFunction("Top2", new 
>>Top2RetractTableAggregateFunction());
>>// 调用函数
>>tEnv.from("stu_score")
>>.groupBy($("course"))
>>.flatAggregate(call("Top2", $("score")).as("score", "rank"))
>>.select($("course"), $("score"), $("rank"))
>>.execute()
>>.print();
>>```
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-05-23 18:21:42,"sjf0115"  写道:
>>>函数代码如下:```public class Top2RetractTableAggregateFunction extends 
>>>TableAggregateFunctionTuple2Long, Integer, 
>>>Top2RetractTableAggregateFunction.Top2RetractAccumulator {
>>>private static final Logger LOG = 
>>>LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);// 
>>>Top2 聚合中间结果数据结构public static class Top2RetractAccumulator {
>>>public long beforeFirst = 0;public long beforeSecond = 
>>>0;public long afterFirst = 0;public long 
>>>afterSecond = 0;}// 创建 Top2Accumulator 累加器并做初始化  
>>>  @Overridepublic Top2RetractAccumulator createAccumulator() { 
>>>   LOG.info("[INFO] createAccumulator 
>>>...");Top2RetractAccumulator acc = new 
>>>Top2RetractAccumulator();acc.beforeFirst = 
>>>Integer.MIN_VALUE;acc.beforeSecond = Integer.MIN_VALUE;
>>>acc.afterFirst = Integer.MIN_VALUE;acc.afterSecond = 
>>>Integer.MIN_VALUE;return acc;}// 
>>>接收输入元素并累加到 Accumulator 数据结构  

Re: Flink Job Manager unable to recognize Task Manager Available slots

2022-05-24 Thread Teoh, Hong
Hi Sunitha,

Without more information about your setup, I would assume you are trying to 
return JobManager (and HA setup) into a stable state. A couple of questions:

  *   Since your job is cancelled, I would assume that the current job’s HA 
state is not important, so we can delete the checkpoint pointer and data.
  *   Are there other jobs running on the same cluster whose HA state you want 
to salvage?

I can think of the following options:

  1.  If there are no other jobs running on the same cluster, and the HA state 
is not important, the easiest way is to totally replace your Zookeeper 
instances. (this will start the JobManager afresh, but will cause the HA state 
for all other jobs running on the same cluster to be lost)
  2.  Manually clear the Zookeeper HA state for the problematic job. This will 
keep the HA state of other jobs running on the same cluster.

To perform step 2, see below:
The zookeeper stores “Active” jobs in a znode hierarchy as shown below (You can 
imagine this like a pseudo file system). I am assuming the jobid you have 
pasted in logs.


  *   /flink/default/running_job_registry/3a97d1d50f663027ae81efe0f0aa
This has the status of the job (e.g. RUNNING)

  *   /flink/default/leader/resource_manager_lock
This has the information about which JM has the ResourceManager (which is the 
component responsible for registering the task slots in the cluster

There are other znodes as well, which are all interesting (e.g. 
/flink/default/checkpoints, /flink/default/checkpoint-counter), but I’ve 
highlighted the relevant ones.

To clear this, you can simply log unto your zookeeper nodes, and delete the 
znodes. The JobManager will repopulate them when the job starts up.

  1.  Log unto your zookeeper nodes (e.g. execute into your zookeeper container)
  2.  Execute the zookeeper CLI. This usually comes prepackaged with zookeeper, 
and you can simply run the pre-packaged script bin/zkCli.sh.

Explore the pseudo-file system by doing ls or get (e.g. ls /flink/default )

  3.  You can delete the znodes associated to your job

rmr /flink/default/running_job_registry/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/jobgraphs/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/checkpoints/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/checkpoint-counter/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/leaderlatch/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/leader/3a97d1d50f663027ae81efe0f0aa

This should result in your JobManager recovering from the faulty job.

Regards,
Hong






From: "s_penakalap...@yahoo.com" 
Date: Tuesday, 24 May 2022 at 18:40
To: User 
Subject: RE: [EXTERNAL]Flink Job Manager unable to recognize Task Manager 
Available slots


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Team,

Any inputs please badly stuck.

Regards,
Sunitha

On Sunday, May 22, 2022, 12:34:22 AM GMT+5:30, s_penakalap...@yahoo.com 
 wrote:


Hi All,

Help please!

We have standalone Flink service installed in individual VM and clubed to form 
a cluster with HA and checkpoint in place. When cancelling Job, Flink cluster 
went down and its unable to start up normally as Job manager is continuously 
going down with the below error:

2022-05-21 14:33:09,314 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 
3a97d1d50f663027ae81efe0f0aa.

Each attempt to restart cluster failed with the same error so the whole cluster 
became unrecoverable and not operating, please help on the below points:
1> In which Fink/zookeeper folder job recovery details are stored and how can 
we clear all old job instance so that Flink cluster will not try to recover and 
start fresh to manually submit all job.

2> Since cluster is HA, we have 2 Job manager's even though one JM is going 
down Flink is started but available slots are showing up as 0 (task manager's 
are up but not displayed in web UI).

Regards
Sunitha.



Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

Yes, that should work (using DataStream as the common result of both 
source creation options)

— Ken

> On May 24, 2022, at 12:19 PM, Piotr Domagalski  wrote:
> 
> Hi Ken,
> 
> Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> navigating the type system and being still confused about differences between 
> Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> 
> I think the DataStream<> type is what I'm looking for? That is, then I can 
> use:
> 
> DataStream source = env.fromSource(getKafkaSource(params), 
> watermarkStrategy, "Kafka");
> when using KafkaSource in the normal setup
> 
> and
> DataStream s = env.addSource(new ParallelTestSource<>(...));
> when using the testing source [1]
> 
> Does that sound right?
> 
> [1] 
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
>  
> 
> On Tue, May 24, 2022 at 7:57 PM Ken Krugler  > wrote:
> Hi Piotr,
> 
> The way I handle this is via a workflow class that uses a builder approach to 
> specifying inputs, outputs, and any other configuration settings.
> 
> The inputs are typically DataStream.
> 
> This way I can separate out the Kafka inputs, and use testing sources that 
> give me very precise control over the inputs (e.g. I can hold up on right 
> side data to ensure my stateful left join junction is handling deferred joins 
> properly). I can also use Kafka unit test support (either kafka-junit or 
> Spring embedded Kafka) if needed.
> 
> Then in the actual tool class (with a main method) I’ll wire up the real 
> Kafka sources, with whatever logic is required to convert the consumer 
> records to what the workflow is expecting.
> 
> — Ken
> 
>> On May 24, 2022, at 8:34 AM, Piotr Domagalski > > wrote:
>> 
>> Hi,
>> 
>> I'm wondering: what ithe recommended way to structure the job which one 
>> would like to test later on with `MiniCluster`.
>> 
>> I've looked at the flink-training repository examples [1] and they tend to 
>> expose the main job as a class that accepts a `SourceFunction` and a 
>> `SinkFunction`, which make sense. But then, my job is normally constructed 
>> with `KafkaSource` which is then passed to `env.fromSource(...`.
>> 
>> Is there any recommended way of handling these discrepancies, ie. having to 
>> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>> 
>> [1] 
>> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>>  
>> 
>> 
>> -- 
>> Piotr Domagalski
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 
> 
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi Ken,

Thanks Ken. I guess the problem I had was, as a complete newbie to Flink,
navigating the type system and being still confused about differences
between Source, SourceFunction, DataStream, DataStreamOperator, etc.

I think the DataStream<> type is what I'm looking for? That is, then I can
use:

DataStream source = env.fromSource(getKafkaSource(params),
watermarkStrategy, "Kafka");
when using KafkaSource in the normal setup

and
DataStream s = env.addSource(new ParallelTestSource<>(...));
when using the testing source [1]

Does that sound right?

[1]
https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26

On Tue, May 24, 2022 at 7:57 PM Ken Krugler 
wrote:

> Hi Piotr,
>
> The way I handle this is via a workflow class that uses a builder approach
> to specifying inputs, outputs, and any other configuration settings.
>
> The inputs are typically DataStream.
>
> This way I can separate out the Kafka inputs, and use testing sources that
> give me very precise control over the inputs (e.g. I can hold up on right
> side data to ensure my stateful left join junction is handling deferred
> joins properly). I can also use Kafka unit test support (either kafka-junit
> or Spring embedded Kafka) if needed.
>
> Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
>
> — Ken
>
> On May 24, 2022, at 8:34 AM, Piotr Domagalski 
> wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having
> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

-- 
Piotr Domagalski


Flink DataStream and remote Stateful Functions interoperability

2022-05-24 Thread Himanshu Sareen
Team,

I'm working on a POC where our existing Stateful Functions ( remote ) can 
interact with Datastream API.
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/

I started Flink cluster - ./bin/start-cluster.sh
Then I submitted the .jar to Flink.

However, on submitting only Embedded function is called by Datastream code.

I'm unable to invoke stateful functions as module.yaml is not loaded.

Can someone help me in understanding how can we deploy Stateful function code 
(module.yaml) and Datastream api code parllely on Flink cluster.


Regards
Himanshu



Re: Source vs SourceFunction and testing

2022-05-24 Thread Ken Krugler
Hi Piotr,

The way I handle this is via a workflow class that uses a builder approach to 
specifying inputs, outputs, and any other configuration settings.

The inputs are typically DataStream.

This way I can separate out the Kafka inputs, and use testing sources that give 
me very precise control over the inputs (e.g. I can hold up on right side data 
to ensure my stateful left join junction is handling deferred joins properly). 
I can also use Kafka unit test support (either kafka-junit or Spring embedded 
Kafka) if needed.

Then in the actual tool class (with a main method) I’ll wire up the real Kafka 
sources, with whatever logic is required to convert the consumer records to 
what the workflow is expecting.

— Ken

> On May 24, 2022, at 8:34 AM, Piotr Domagalski  wrote:
> 
> Hi,
> 
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
> 
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> 
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> 
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>  
> 
> 
> -- 
> Piotr Domagalski

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





Re: Flink Job Manager unable to recognize Task Manager Available slots

2022-05-24 Thread s_penakalap...@yahoo.com
 Hi Team,
Any inputs please badly stuck.
Regards,Sunitha
On Sunday, May 22, 2022, 12:34:22 AM GMT+5:30, s_penakalap...@yahoo.com 
 wrote:  
 
 Hi All,
Help please!
We have standalone Flink service installed in individual VM and clubed to form 
a cluster with HA and checkpoint in place. When cancelling Job, Flink cluster 
went down and its unable to start up normally as Job manager is continuously 
going down with the below error:
2022-05-21 14:33:09,314 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
occurred in the cluster entrypoint.java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 
3a97d1d50f663027ae81efe0f0aa.
Each attempt to restart cluster failed with the same error so the whole cluster 
became unrecoverable and not operating, please help on the below points:1> In 
which Fink/zookeeper folder job recovery details are stored and how can we 
clear all old job instance so that Flink cluster will not try to recover and 
start fresh to manually submit all job.
2> Since cluster is HA, we have 2 Job manager's even though one JM is going 
down Flink is started but available slots are showing up as 0 (task manager's 
are up but not displayed in web UI).
RegardsSunitha.
  

Re: TolerableCheckpointFailureNumber not always applying

2022-05-24 Thread Gaël Renoux
I get the idea, but in our case this was a transient error: it was a
network issue, which was solved later without any change in Flink (see last
line of stack-trace). Errors in the sync phase are not always non-transient
(in our case, they are pretty much never).

To be honest, I have trouble imagining a case in Production where you'd
want the job to fail if a checkpoint fails. In a test environment, sure,
you want to crash as soon as possible if something goes wrong. But in
Production? I'd rather continue working as long as my job can work. Sure,
I'm in trouble if the job crashes and I don't have a recent checkpoint -
but then, crashing exactly when I don't have a checkpoint is the worst
thing that can happen.

So I think the safest solution (when Flink is in doubt about whether it's
transient or not) would be to assume the error is transient and apply the
tolerable failures configuration. In that case, the worst case scenario is
that your job goes on, and you have to verify your checkpoints to see if
everything is alright - which is something you should always do anyway, in
case you have a transient error that's not going away.



On Tue, May 24, 2022 at 6:04 AM Hangxiang Yu  wrote:

> In my opinion,  some exceptions in the async phase like timeout may happen
> related to the network, state size which will change, so maybe next time
> these failures will not occur. So the config makes sense for these.
> But this failure in the sync phase usually means the program will always
> fail and it will influence the. normal procedure, it has to be stopped.
> If you don't need to recover from the checkpoint, maybe you could disable
> it. But it's not recommended for a streaming job.
>
> Best,
> Hangxiang.
>
> On Tue, May 24, 2022 at 12:51 AM Gaël Renoux 
> wrote:
>
>> Got it, thank you. I misread the documentation and thought the async
>> referred to the task itself, not the process of taking a checkpoint.
>>
>> I guess there is currently no way to make a job never fail on a failed
>> checkpoint?
>>
>> Gaël Renoux - Lead R Engineer
>> E - gael.ren...@datadome.co
>> W - www.datadome.co
>>
>>
>>
>> On Mon, May 23, 2022 at 4:35 PM Hangxiang Yu  wrote:
>>
>>> Hi, Gaël Renoux.
>>> As you could see in [1], There are some descriptions about the config:
>>> "This only applies to the following failure reasons: IOException on the
>>> Job Manager, failures in the async phase on the Task Managers and
>>> checkpoint expiration due to a timeout. Failures originating from the sync
>>> phase on the Task Managers are always forcing failover of an affected task.
>>> Other types of checkpoint failures (such as checkpoint being subsumed) are
>>> being ignored."
>>>
>>> From the stack-trace, I see the exception is thrown in the sync phase.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 5:18 PM Gaël Renoux 
>>> wrote:
>>>
 Hello everyone,

 We're having an issue on our Flink job: it restarted because it failed
 a checkpoint, even though it shouldn't have. We've set the
 tolerableCheckpointFailureNumber to 1 million to never have the job restart
 because of this. However, the job did restart following a checkpoint
 failure in a Kafka sink (stack-trace below).

 I'm about to open an issue on Flink's Jira, but I thought I'd check if
 I'm missing something first. Is there a known limitation somewhere? Or
 should the tolerableCheckpointFailureNumber apply in that case?

 The stack-trace:

 Sink: result-to-kafka (1/1)#0 (0dbd01ad4663f5dd642569381694f57e)
> switched from RUNNING to FAILED with failure cause: java.io.IOException:
> Could not perform checkpoint 853 for operator Sink: result-to-kafka 
> (1/1)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.triggerCheckpointOnAligned(CheckpointBarrierTracker.java:301)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:141)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> 

Re: Source vs SourceFunction and testing

2022-05-24 Thread Aeden Jameson
Depending on the kind of testing you're hoping to do you may want to
look into https://github.com/mguenther/kafka-junit. For example,
you're looking for some job level smoke tests that just answer the
question "Is everything wired up correctly?"  Personally, I like how
this approach doesn't require you to open up the design for the sake
of testing.


On Tue, May 24, 2022 at 8:34 AM Piotr Domagalski  wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj


Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi,

I'm wondering: what ithe recommended way to structure the job which one
would like to test later on with `MiniCluster`.

I've looked at the flink-training repository examples [1] and they tend to
expose the main job as a class that accepts a `SourceFunction` and a
`SinkFunction`, which make sense. But then, my job is normally constructed
with `KafkaSource` which is then passed to `env.fromSource(...`.

Is there any recommended way of handling these discrepancies, ie. having to
use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?

[1]
https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61

-- 
Piotr Domagalski


Re:Re: Re: Some question with Flink state

2022-05-24 Thread Xuyang
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>>
>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>这样呢
>>
>>
>>lxk7...@163.com
>> 
>>From: Xuyang
>>Date: 2022-05-24 20:17
>>To: user-zh
>>Subject: Re:Re: Re: Some question with Flink state
>>Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
>> 
>>图片好像有点问题,重新上传一下
>>lxk7...@163.com
>>From: Hangxiang Yu
>>Date: 2022-05-24 12:09
>>To: user-zh
>>Subject: Re: Re: Some question with Flink state
>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>>
>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>>
>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>>
>>>
>>>
>>> lxk7...@163.com
>>>
>>> From: Hangxiang Yu
>>> Date: 2022-05-23 23:09
>>> To: user-zh; lxk7491
>>> Subject: Re: Some question with Flink state
>>> Hello,
>>> All states will not be shared in different parallelisms.
>>> BTW, English questions could be sent to u...@flink.apache.org.
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>>
>>> >
>>> > Hi everyone
>>> >I was used Flink keyed-state in my Project.But I found some questions
>>> > that make me confused.
>>> >when I used value-state in multi parallelism  the value is not I
>>> wanted.
>>> >So I guess that value-state is in every parallelism. every parallelism
>>> > saved their only value  which means the value is Thread-Level
>>> >But when I used map-state,the value is correctly. I mean the map-state
>>> > was shared by every parallelism.
>>> >   looking forward to your reply
>>> >
>>> >
>>> > lxk7...@163.com
>>> >
>>>


Re:accuracy validation of streaming pipeline

2022-05-24 Thread Xuyang
I think for an unbounded data, we can only check the result at one point of 
time, that is the work what Watermark[1] does. What about tag one time and to 
validate the data accuracy at that moment?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#watermark



在 2022-05-20 16:02:39,"vtygoss"  写道:

Hi community!




I'm working on migrating from full-data-pipeline(with spark) to 
incremental-data-pipeline(with flink cdc), and i met a problem about accuracy 
validation between pipeline based flink and spark.




For bounded data, it's simple to validate the two result sets are consitent or 
not. 

But, for unbouned data and event-driven application, how to make sure the data 
stream produced is correct, especially when there are some retract functions 
with high impactions, e.g. row_number. 




Is there any document for this preblom?  Thanks for your any suggestions or 
replies. 




Best Regards! 

Re:Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-24 Thread Xuyang
Hi, 
我debug到了代码里,似乎是个bug。如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink
 Planner里,只看有没有定义emitValue。你可以去Flink issue上提一下这个bug
在 2022-05-23 18:24:17,"sjf0115"  写道:
>Flink 版本:1.13.5
>
>
>
>
>函数完整代码如下:
>```
>public class Top2RetractTableAggregateFunction extends 
>TableAggregateFunction, 
>Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
>private static final Logger LOG = 
> LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
>// Top2 聚合中间结果数据结构
>public static class Top2RetractAccumulator {
>public long beforeFirst = 0;
>public long beforeSecond = 0;
>public long afterFirst = 0;
>public long afterSecond = 0;
>}
>
>
>// 创建 Top2Accumulator 累加器并做初始化
>@Override
>public Top2RetractAccumulator createAccumulator() {
>LOG.info("[INFO] createAccumulator ...");
>Top2RetractAccumulator acc = new Top2RetractAccumulator();
>acc.beforeFirst = Integer.MIN_VALUE;
>acc.beforeSecond = Integer.MIN_VALUE;
>acc.afterFirst = Integer.MIN_VALUE;
>acc.afterSecond = Integer.MIN_VALUE;
>return acc;
>}
>
>
>// 接收输入元素并累加到 Accumulator 数据结构
>public void accumulate(Top2RetractAccumulator acc, Long value) {
>LOG.info("[INFO] accumulate ...");
>if (value > acc.afterFirst) {
>acc.afterSecond = acc.afterFirst;
>acc.afterFirst = value;
>} else if (value > acc.afterSecond) {
>acc.afterSecond = value;
>}
>}
>
>
>// 带撤回的输出
>public void emitUpdateWithRetract(Top2RetractAccumulator acc, 
> RetractableCollector> out) {
>LOG.info("[INFO] emitUpdateWithRetract ...");
>if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>// 撤回旧记录
>if (acc.beforeFirst != Integer.MIN_VALUE) {
>out.retract(Tuple2.of(acc.beforeFirst, 1));
>}
>// 输出新记录
>out.collect(Tuple2.of(acc.afterFirst, 1));
>acc.beforeFirst = acc.afterFirst;
>}
>if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>// 撤回旧记录
>if (acc.beforeSecond != Integer.MIN_VALUE) {
>out.retract(Tuple2.of(acc.beforeSecond, 2));
>}
>// 输出新记录
>out.collect(Tuple2.of(acc.afterSecond, 2));
>acc.beforeSecond = acc.afterSecond;
>}
>}
>}
>```
>调用完整代码如下:
>```
>// 执行环境
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>EnvironmentSettings settings = EnvironmentSettings
>.newInstance()
>.useOldPlanner() // Blink Planner 异常 Old Planner 可以
>.inStreamingMode()
>.build();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
>
>DataStream sourceStream = env.fromElements(
>Row.of("李雷", "语文", 78),
>Row.of("韩梅梅", "语文", 50),
>Row.of("李雷", "语文", 99),
>Row.of("韩梅梅", "语文", 80),
>Row.of("李雷", "英语", 90),
>Row.of("韩梅梅", "英语", 40),
>Row.of("李雷", "英语", 98),
>Row.of("韩梅梅", "英语", 88)
>);
>
>
>// 注册虚拟表
>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), 
>$("score"));
>// 注册临时i系统函数
>tEnv.createTemporarySystemFunction("Top2", new 
>Top2RetractTableAggregateFunction());
>// 调用函数
>tEnv.from("stu_score")
>.groupBy($("course"))
>.flatAggregate(call("Top2", $("score")).as("score", "rank"))
>.select($("course"), $("score"), $("rank"))
>.execute()
>.print();
>```
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-05-23 18:21:42,"sjf0115"  写道:
>>函数代码如下:```public class Top2RetractTableAggregateFunction extends 
>>TableAggregateFunctionTuple2Long, Integer, 
>>Top2RetractTableAggregateFunction.Top2RetractAccumulator {
>>private static final Logger LOG = 
>>LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);// 
>>Top2 聚合中间结果数据结构public static class Top2RetractAccumulator { 
>>   public long beforeFirst = 0;public long beforeSecond = 
>>0;public long afterFirst = 0;public long 
>>afterSecond = 0;}// 创建 Top2Accumulator 累加器并做初始化   
>> @Overridepublic Top2RetractAccumulator createAccumulator() {   
>> LOG.info("[INFO] createAccumulator ...");   
>> Top2RetractAccumulator acc = new Top2RetractAccumulator();
>>acc.beforeFirst = Integer.MIN_VALUE;acc.beforeSecond = 
>>Integer.MIN_VALUE;acc.afterFirst = Integer.MIN_VALUE;   
>> acc.afterSecond = Integer.MIN_VALUE;return acc;
>>}// 接收输入元素并累加到 Accumulator 数据结构public void 
>>accumulate(Top2RetractAccumulator acc, Long value) {
>>LOG.info("[INFO] accumulate ...");if 
>>(value  acc.afterFirst) {

Re: Re: Some question with Flink state

2022-05-24 Thread lxk7...@163.com
如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?



lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 Thread lxk7...@163.com
好的,我会尝试去弄一下。


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re:Re: Re: Some question with Flink state

2022-05-24 Thread Xuyang
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 Thread lxk7...@163.com

https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png

这样呢


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-24 Thread lxk7...@163.com
[URL=https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png][IMG]https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png[/IMG][/URL]
[URL=https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png][IMG]https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png[/IMG][/URL]
看下这个是否能看见图片


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Python Job Type Support in Flink Kubernetes Operator

2022-05-24 Thread Gyula Fóra
Hi Jeesmon!

Sorry I completely missed this question earlier :)

There is no support currently for Python jobs and I don't really have any
experience with Python jobs so cannot really comment on how easy it would
be to integrate it.

We and most of the companies currently involved with developing the
operators focus on Flink production jobs built on Java so this feature is
not on our radar at the moment.
If this is something interesting for you and you would like to investigate
and contribute to it we would be happy to help you along the way.

Cheers,
Gyula

On Tue, May 24, 2022 at 5:24 AM Jeesmon Jacob  wrote:

> Hi Gyula,
>
> Any idea on this? We are exploring current limitations of using the
> operator for Flink deployment and if there is a plan to support Python jobs
> in future will help us.
>
> Thanks,
> Jeesmon
>
> On Fri, May 20, 2022 at 3:46 PM Jeesmon Jacob  wrote:
>
>> Hi there,
>>
>> Is there a plan to support Python Job Type in Flink Kubernetes Operator?
>> If yes, any ETA?
>>
>> According to this previous operator overview only Java jobs are supported
>> in operator. This page was recently modified to remove the features table.
>>
>>
>> https://github.com/apache/flink-kubernetes-operator/blob/73369b851f2cd92a6818bb84e21157518d63a48d/docs/content/docs/concepts/overview.md
>>
>> Job Type Jar job full
>>
>> SQL Job no
>>
>> Python Job no
>>
>> Thanks,
>> Jeesmon
>>
>


Re: Python Job Type Support in Flink Kubernetes Operator

2022-05-24 Thread Jeesmon Jacob
Hi Gyula,

Any idea on this? We are exploring current limitations of using the
operator for Flink deployment and if there is a plan to support Python jobs
in future will help us.

Thanks,
Jeesmon

On Fri, May 20, 2022 at 3:46 PM Jeesmon Jacob  wrote:

> Hi there,
>
> Is there a plan to support Python Job Type in Flink Kubernetes Operator?
> If yes, any ETA?
>
> According to this previous operator overview only Java jobs are supported
> in operator. This page was recently modified to remove the features table.
>
>
> https://github.com/apache/flink-kubernetes-operator/blob/73369b851f2cd92a6818bb84e21157518d63a48d/docs/content/docs/concepts/overview.md
>
> Job Type Jar job full
>
> SQL Job no
>
> Python Job no
>
> Thanks,
> Jeesmon
>


Re:Re: Re: Some question with Flink state

2022-05-24 Thread Xuyang
Hi, 你的图还是挂了,可以使用图床工具试一下



在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:

图片好像有点问题,重新上传一下
lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>

Re: Application mode deployment through API call

2022-05-24 Thread Peter Schrott
Hi Vikash,

Could you be more precise about the shared libraries? Is there any
documentation about this?

Thanks, Peter

On Tue, May 24, 2022 at 1:23 PM Vikash Dat  wrote:

> Similar to agent Biao, Application mode is okay if you only have a single
> app, but when running multiple apps session mode is better for control. In
> my experience, the CLIFrontend is not as robust as the REST API, or you
> will end up having to rebuild a very similar Rest API. For the meta space
> issue, have you tried adding shared libraries to the flink lib folder?
>
> On Mon, May 23, 2022 at 23:31 Shengkai Fang  wrote:
>
>> Hi, all.
>>
>> > is there any plan in the Flink community to provide an easier way of
>> deploying Flink with application mode on YARN
>>
>> Yes. Jark has already opened a ticket about how to use the sql client to
>> submit the SQL in application mode[1]. What's more, in FLIP-222 we are able
>> to manage the jobs in SQL, which will list all submitted jobs and their web
>> UI[2].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-26541
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+query+lifecycle+statements+in+SQL+client
>>
>>
>>


Re: Json Deserialize in DataStream API with array length not fixed

2022-05-24 Thread Qingsheng Ren
Hi Zain,

I assume you are using DataStream API as described in the subject of your 
email, so I think you can define any functions/transformations to parse the 
json value, even the schema is changing. 

It looks like the value of field “array_coordinates” is a an escaped 
json-formatted STRING instead of an json object, so I prefer to parse the input 
json string first using Jackson (or any json parser you like), extract the 
field “array_coordinates” as a string, remove all backslashs to un-escape the 
string, and use Jackson again to parse it. 

If you are using Table / SQL API, I’m afaid you have to use UDTF to parse the 
input because the schema varies in the field “array_coordinates”. 

Hope this could be helpful!

Cheers, 

Qingsheng

> On May 21, 2022, at 14:58, Zain Haider Nemati  wrote:
> 
> Hi Folks,
> I have data coming in this format:
> 
> {
> “data”: {
> “oid__id”:  “61de4f26f01131783f162453”,
> “array_coordinates”:“[ { \“speed\” : \“xxx\“, \“accuracy\” : 
> \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : \“xxx\“, \“longitude\” : 
> \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : \“xxx\“, \“_id\” : { 
> \“$oid\” : \“xxx\” } }, { \“speed\” : \“xxx\“, \“isFromMockProvider\” : 
> \“false\“, \“accuracy\” : \“xxx\“, \“bearing\” : \“xxx\“, \“altitude\” : 
> \“xxx\“, \“longitude\” : \“xxx\“, \“latitude\” : \“xxx\“, \“dateTimeStamp\” : 
> \“xxx\“, \“_id\” : { \“$oid\” : \“xxx\” } }]“,
> “batchId”:  “xxx",
> “agentId”:  “xxx",
> “routeKey”: “40042-12-01-2022",
> “__v”:  0
> },
> “metadata”: {
> “timestamp”:“2022-05-02T18:49:52.619827Z”,
> “record-type”:  “data”,
> “operation”:“load”,
> “partition-key-type”:   “primary-key”,
> “schema-name”:  “xxx”,
> “table-name”:   “xxx”
> }
> }
> 
> Where length of array coordinates array varies is not fixed in the source is 
> their any way to define a json deserializer for this? If so would really 
> appreciate if I can get some help on this



Re: Application mode deployment through API call

2022-05-24 Thread Vikash Dat
Similar to agent Biao, Application mode is okay if you only have a single
app, but when running multiple apps session mode is better for control. In
my experience, the CLIFrontend is not as robust as the REST API, or you
will end up having to rebuild a very similar Rest API. For the meta space
issue, have you tried adding shared libraries to the flink lib folder?

On Mon, May 23, 2022 at 23:31 Shengkai Fang  wrote:

> Hi, all.
>
> > is there any plan in the Flink community to provide an easier way of
> deploying Flink with application mode on YARN
>
> Yes. Jark has already opened a ticket about how to use the sql client to
> submit the SQL in application mode[1]. What's more, in FLIP-222 we are able
> to manage the jobs in SQL, which will list all submitted jobs and their web
> UI[2].
>
> [1] https://issues.apache.org/jira/browse/FLINK-26541
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-222%3A+Support+full+query+lifecycle+statements+in+SQL+client
>
>
>


Re: How can I set job parameter in flink sql

2022-05-24 Thread Qingsheng Ren
Hi,

You can take use of the configuration “pipeline.global-job-parameters” [1] to 
pass your custom configs all the way into the UDF. For example you can execute 
this in SQL client:

SET pipeline.global-job-parameters=black_list_path:/root/list.properties;

Then you can get the value “/root/list.properties” by 
context.getJobParameter(“black_list_path”, “your_default_value”) in the open() 
method of your UDF.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#pipeline-global-job-parameters

Cheers, 

Qingsheng

> On May 11, 2022, at 14:36, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> I want to override the function open() in my UDF, like:
> 
> 
> 
> In open() function, I want to fetch the configred value "black_list_path", 
> then simply print that value out. And I config this value in ./sql-client.sh 
> console:
> 
> SET black_list_path = /root/list.properties
> 
> Then I run this UDF, but what printed is /config/list.properties(this is the 
> default value as I set in context.getJobParameter("black_list_path", 
> "/config/list/properties")), not /root/list.properties which I set in 
> ./sql-client.sh console.
> 
> So could you please show me the correct way to set black_list_path is sql ? 
> Thanks so much!
> 
> 
> Thanks && Reards,
> Hunk
> 
> 
> 
> 
> 
>  



GlobalCommitter in Flink's two-phase commit

2022-05-24 Thread di wu
Hello
Regarding the GlobalCommitter in Flink's two-phase commit,
I see it was introduced in FLIP-143, but it seems to have been removed again in 
FLP-191 and marked as Deprecated in the source code.
I haven't found any relevant information about the use of GlobalCommitter.


There are two questions I would like to ask:
1. What are the general usage scenarios of GlobalCommitter?
2. Why should GlobalCommitter be removed in the new version of the api?


Thanks  Regards,


di.wu

GlobalCommitter in Flink's two-phase commit

2022-05-24 Thread di wu
Hello
Regarding the GlobalCommitter in Flink's two-phase commit,
I see it was introduced in FLIP-143, but it seems to have been removed again in 
FLP-191 and marked as Deprecated in the source code.
I haven't found any relevant information about the use of GlobalCommitter.


There are two questions I would like to ask:
1. What are the general usage scenarios of GlobalCommitter?
2. Why should GlobalCommitter be removed in the new version of the api?


Thanks  Regards,


di.wu