flink-ml algorithms

2022-06-02 Thread Natia Chachkhiani
Hi,

I am running OnlineKmeans from flink-ml repo on a small dataset. I've
noticed that I don't get consistent results, assignments to clusters,
across different runs. I have set both parallelism and globalBatchSize to 1.
I am doing simple fit and transform on each data point ingested. Is the
order of processing not guaranteed? Or am I missing something?

Thanks,
Natia


Unable to connect to Flink

2022-06-02 Thread Benjamin Carlton
Greetings all,

I’m a member of IBM’s IT support and we have found that IBM cannot access 
Flink’s URL on the domain: nightlies.apache.org

After investigation, we believe that we may have been mistakenly placed on a 
deny list, and we need to reques that IBM’s addresses be placed on an allow 
list or verified as already belonging to an allow list for the above listed 
domain.

I have already emailed the Apache.org webmaster and created a Flink Jira ticket 
(https://issues.apache.org/jira/browse/FLINK-27887) but we were hoping to 
verify the correct process for our request. If you could help us gain access to 
the above domain from the attached list of addresses, it would be most 
appreciated. If there is another process I need to follow, please let me know 
and I will be happy to follow that process instead. Or, if there’s a specific 
person to whom I should make my request, I would appreciate their name.

Thank you.

Best regards,

Benjamin Carlton
Exec IT Support Advisor | Executive Start Program
IBM
benjamin.carl...@ibm.com



Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jing Ge
Thanks everyone for your effort!

Best regards,
Jing

On Thu, Jun 2, 2022 at 4:17 PM Martijn Visser 
wrote:

> Thanks everyone for joining! It's good to see so many have joined in such
> a short time already. I've just refreshed the link which you can always
> find on the project website [1]
>
> Best regards, Martijn
>
> [1] https://flink.apache.org/community.html#slack
>
> Op do 2 jun. 2022 om 11:42 schreef Jingsong Li :
>
>> Thanks Xingtong, Jark, Martijn and Robert for making this possible!
>>
>> Best,
>> Jingsong
>>
>>
>> On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:
>>
>>> Thank Xingtong for making this possible!
>>>
>>> Cheers,
>>> Jark Wu
>>>
>>> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>>>
>>> > Hi everyone,
>>> >
>>> > I'm very happy to announce that the Apache Flink community has created
>>> a
>>> > dedicated Slack workspace [1]. Welcome to join us on Slack.
>>> >
>>> > ## Join the Slack workspace
>>> >
>>> > You can join the Slack workspace by either of the following two ways:
>>> > 1. Click the invitation link posted on the project website [2].
>>> > 2. Ask anyone who already joined the Slack workspace to invite you.
>>> >
>>> > We recommend 2), if available. Due to Slack limitations, the invitation
>>> > link in 1) expires and needs manual updates after every 100 invites.
>>> If it
>>> > is expired, please reach out to the dev / user mailing lists.
>>> >
>>> > ## Community rules
>>> >
>>> > When using the community Slack workspace, please follow these community
>>> > rules:
>>> > * *Be respectful* - This is the most important rule!
>>> > * All important decisions and conclusions *must be reflected back to
>>> the
>>> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
>>> happen."
>>> > - The Apache Mottos [3]
>>> > * Use *Slack threads* to keep parallel conversations from overwhelming
>>> a
>>> > channel.
>>> > * Please *do not direct message* people for troubleshooting, Jira
>>> assigning
>>> > and PR review. These should be picked-up voluntarily.
>>> >
>>> >
>>> > ## Maintenance
>>> >
>>> >
>>> > Committers can refer to this wiki page [4] for information needed for
>>> > maintaining the Slack workspace.
>>> >
>>> >
>>> > Thanks Jark, Martijn and Robert for helping setting up the Slack
>>> workspace.
>>> >
>>> >
>>> > Best,
>>> >
>>> > Xintong
>>> >
>>> >
>>> > [1] https://apache-flink.slack.com/
>>> >
>>> > [2] https://flink.apache.org/community.html#slack
>>> >
>>> > [3] http://theapacheway.com/on-list/
>>> >
>>> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>>> >
>>>
>>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jing Ge
Thanks everyone for your effort!

Best regards,
Jing

On Thu, Jun 2, 2022 at 4:17 PM Martijn Visser 
wrote:

> Thanks everyone for joining! It's good to see so many have joined in such
> a short time already. I've just refreshed the link which you can always
> find on the project website [1]
>
> Best regards, Martijn
>
> [1] https://flink.apache.org/community.html#slack
>
> Op do 2 jun. 2022 om 11:42 schreef Jingsong Li :
>
>> Thanks Xingtong, Jark, Martijn and Robert for making this possible!
>>
>> Best,
>> Jingsong
>>
>>
>> On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:
>>
>>> Thank Xingtong for making this possible!
>>>
>>> Cheers,
>>> Jark Wu
>>>
>>> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>>>
>>> > Hi everyone,
>>> >
>>> > I'm very happy to announce that the Apache Flink community has created
>>> a
>>> > dedicated Slack workspace [1]. Welcome to join us on Slack.
>>> >
>>> > ## Join the Slack workspace
>>> >
>>> > You can join the Slack workspace by either of the following two ways:
>>> > 1. Click the invitation link posted on the project website [2].
>>> > 2. Ask anyone who already joined the Slack workspace to invite you.
>>> >
>>> > We recommend 2), if available. Due to Slack limitations, the invitation
>>> > link in 1) expires and needs manual updates after every 100 invites.
>>> If it
>>> > is expired, please reach out to the dev / user mailing lists.
>>> >
>>> > ## Community rules
>>> >
>>> > When using the community Slack workspace, please follow these community
>>> > rules:
>>> > * *Be respectful* - This is the most important rule!
>>> > * All important decisions and conclusions *must be reflected back to
>>> the
>>> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
>>> happen."
>>> > - The Apache Mottos [3]
>>> > * Use *Slack threads* to keep parallel conversations from overwhelming
>>> a
>>> > channel.
>>> > * Please *do not direct message* people for troubleshooting, Jira
>>> assigning
>>> > and PR review. These should be picked-up voluntarily.
>>> >
>>> >
>>> > ## Maintenance
>>> >
>>> >
>>> > Committers can refer to this wiki page [4] for information needed for
>>> > maintaining the Slack workspace.
>>> >
>>> >
>>> > Thanks Jark, Martijn and Robert for helping setting up the Slack
>>> workspace.
>>> >
>>> >
>>> > Best,
>>> >
>>> > Xintong
>>> >
>>> >
>>> > [1] https://apache-flink.slack.com/
>>> >
>>> > [2] https://flink.apache.org/community.html#slack
>>> >
>>> > [3] http://theapacheway.com/on-list/
>>> >
>>> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>>> >
>>>
>>


Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-02 Thread Jan Lukavský

-user@flink  as this looks like purely beam issue

Could you please elaborate more about what "stuck" means? Does the 
watermark stop progressing? Does that happen at any specific instant 
(e.g. end of window or end of window + allowed lateness)?


On 6/1/22 15:43, Gorjan Todorovski wrote:

Hi Jan,

I have not checked the harness log. I have now checked it *Apache Beam 
worker log) and found this, but currently not sure what it means:


2022/06/01 13:34:40 Python exited: 
2022/06/01 13:34:41 Python exited: 
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in 
_bootstrap_inner

    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
line 587, in 

    target=lambda: self._read_inputs(elements_iterator),
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
line 570, in _read_inputs

    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
416, in __next__

    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
803, in _next

    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of 
RPC that terminated with:

status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = 
"{"created":"@1654090485.252525992","description":"Error received from 
peer ipv4:127.0.0.1:44439 
","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer 
hanging up","grpc_status":1}"

>

2022/06/01 13:34:45 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:47 Python exited: 
Starting worker with command ['/opt/apache/beam/boot', '--id=3-1', 
'--logging_endpoint=localhost:44267', 
'--artifact_endpoint=localhost:36413', 
'--provision_endpoint=localhost:42179', 
'--control_endpoint=localhost:38825']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-3', 
'--logging_endpoint=localhost:38683', 
'--artifact_endpoint=localhost:44867', 
'--provision_endpoint=localhost:34833', 
'--control_endpoint=localhost:44351']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-2', 
'--logging_endpoint=localhost:35391', 
'--artifact_endpoint=localhost:46571', 
'--provision_endpoint=localhost:44073', 
'--control_endpoint=localhost:44133']

Starting work...

On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský  wrote:

Hi Gorjan,

+user@beam 

The trace you posted is just waiting for a bundle to finish in the
SDK harness. I would suspect there is a problem in the logs of the
harness. Did you look for possible errors there?

 Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:

Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses
Apache Beam for data processing which in turn has a Flink Runner
(Basically a batch job on a Flink Session Cluster on Kubernetes)
version 1.13.6, but the job (for gathering stats) gets stuck.

There is nothing significant in the Job Manager or Task Manager
logs. The only thing that possibly might tell why the task is
stuck seems to be a thread dump:

"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at sun.misc.Unsafe.park(Native Method)
- waiting on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at

java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at

java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at

org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
...
I use 32 parallel degrees. Task managers are set, so each TM runs
in one container with 1 CPU and a total process memory set to 20
GB. Each TM runs 1 tasksslot.
This is failing with ~100 files with a total size of about 100
GB. If I run the pipeline with a smaller number of files to
process, it runs ok.
I need Flink to be able to process different amounts of data as
it is able to scale by automatically adding pods depending on the
parallel degree setting for the specific job (I set the parallel
degree to the max(number of files,32))

Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Martijn Visser
Thanks everyone for joining! It's good to see so many have joined in such a
short time already. I've just refreshed the link which you can always find
on the project website [1]

Best regards, Martijn

[1] https://flink.apache.org/community.html#slack

Op do 2 jun. 2022 om 11:42 schreef Jingsong Li :

> Thanks Xingtong, Jark, Martijn and Robert for making this possible!
>
> Best,
> Jingsong
>
>
> On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:
>
>> Thank Xingtong for making this possible!
>>
>> Cheers,
>> Jark Wu
>>
>> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>>
>> > Hi everyone,
>> >
>> > I'm very happy to announce that the Apache Flink community has created a
>> > dedicated Slack workspace [1]. Welcome to join us on Slack.
>> >
>> > ## Join the Slack workspace
>> >
>> > You can join the Slack workspace by either of the following two ways:
>> > 1. Click the invitation link posted on the project website [2].
>> > 2. Ask anyone who already joined the Slack workspace to invite you.
>> >
>> > We recommend 2), if available. Due to Slack limitations, the invitation
>> > link in 1) expires and needs manual updates after every 100 invites. If
>> it
>> > is expired, please reach out to the dev / user mailing lists.
>> >
>> > ## Community rules
>> >
>> > When using the community Slack workspace, please follow these community
>> > rules:
>> > * *Be respectful* - This is the most important rule!
>> > * All important decisions and conclusions *must be reflected back to the
>> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
>> happen."
>> > - The Apache Mottos [3]
>> > * Use *Slack threads* to keep parallel conversations from overwhelming a
>> > channel.
>> > * Please *do not direct message* people for troubleshooting, Jira
>> assigning
>> > and PR review. These should be picked-up voluntarily.
>> >
>> >
>> > ## Maintenance
>> >
>> >
>> > Committers can refer to this wiki page [4] for information needed for
>> > maintaining the Slack workspace.
>> >
>> >
>> > Thanks Jark, Martijn and Robert for helping setting up the Slack
>> workspace.
>> >
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> > [1] https://apache-flink.slack.com/
>> >
>> > [2] https://flink.apache.org/community.html#slack
>> >
>> > [3] http://theapacheway.com/on-list/
>> >
>> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>> >
>>
>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Martijn Visser
Thanks everyone for joining! It's good to see so many have joined in such a
short time already. I've just refreshed the link which you can always find
on the project website [1]

Best regards, Martijn

[1] https://flink.apache.org/community.html#slack

Op do 2 jun. 2022 om 11:42 schreef Jingsong Li :

> Thanks Xingtong, Jark, Martijn and Robert for making this possible!
>
> Best,
> Jingsong
>
>
> On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:
>
>> Thank Xingtong for making this possible!
>>
>> Cheers,
>> Jark Wu
>>
>> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>>
>> > Hi everyone,
>> >
>> > I'm very happy to announce that the Apache Flink community has created a
>> > dedicated Slack workspace [1]. Welcome to join us on Slack.
>> >
>> > ## Join the Slack workspace
>> >
>> > You can join the Slack workspace by either of the following two ways:
>> > 1. Click the invitation link posted on the project website [2].
>> > 2. Ask anyone who already joined the Slack workspace to invite you.
>> >
>> > We recommend 2), if available. Due to Slack limitations, the invitation
>> > link in 1) expires and needs manual updates after every 100 invites. If
>> it
>> > is expired, please reach out to the dev / user mailing lists.
>> >
>> > ## Community rules
>> >
>> > When using the community Slack workspace, please follow these community
>> > rules:
>> > * *Be respectful* - This is the most important rule!
>> > * All important decisions and conclusions *must be reflected back to the
>> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
>> happen."
>> > - The Apache Mottos [3]
>> > * Use *Slack threads* to keep parallel conversations from overwhelming a
>> > channel.
>> > * Please *do not direct message* people for troubleshooting, Jira
>> assigning
>> > and PR review. These should be picked-up voluntarily.
>> >
>> >
>> > ## Maintenance
>> >
>> >
>> > Committers can refer to this wiki page [4] for information needed for
>> > maintaining the Slack workspace.
>> >
>> >
>> > Thanks Jark, Martijn and Robert for helping setting up the Slack
>> workspace.
>> >
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> > [1] https://apache-flink.slack.com/
>> >
>> > [2] https://flink.apache.org/community.html#slack
>> >
>> > [3] http://theapacheway.com/on-list/
>> >
>> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>> >
>>
>


Re:Re: Re: [Internet]Re: Re: Some question with Flink state

2022-06-02 Thread Xuyang
Hi, 理论上来说这句话是不是有问题?


> “是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换”


因为ValueState也是keyedState的一种,所以也是每个key各自维护一个valuestate,不同的key之间是隔离的。
其实一般情况下ValueState里面存Map,和直接MapState没啥区别,只不过在不同的状态存储上和状态的TTL策略有略微不同,所以不太推荐ValueState里面存Map。
所以其实还是看具体的业务场景,假如只是算一个累加的值的话,用valuestate就够了。




--

Best!
Xuyang





在 2022-05-25 13:38:52,"lxk7...@163.com"  写道:
>
>刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
>"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
>我理解   
>是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
>这样的话,大部分场景其实都适合使用map-state。
>
>
>lxk7...@163.com
> 
>From: jurluo(罗凯)
>Date: 2022-05-25 11:05
>To: user-zh@flink.apache.org
>Subject: Re: [Internet]Re: Re: Some question with Flink state
>老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
>group,然后固定的key 
>group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
> group里面的key都可以通过map-state的user-key去分别存储。
> 
>> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
>> 
>> 图片好像又挂了  我重发下
>> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
>> 
>> 
>> 
>>下面是我的代码及测试结果
>> 
>> 
>> 
>> 一.使用int类型
>> 
>> 
>> 
>>public class KeyByTest {
>> 
>> 
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>> 
>> env.setParallelism(10);
>> 
>> 
>> 
>> 
>> 
>> DataStreamSource dataDataStreamSource = 
>> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
>> 
>> 
>> 
>> new data(1, "123", "分类页"),
>> 
>> 
>> 
>> new data(2, "r-123", "搜索结果页"),
>> 
>> 
>> 
>> new data(1, "r-123", "我的页"),
>> 
>> 
>> 
>> new data(3, "r-4567", "搜索结果页")));
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> SingleOutputStreamOperator map = 
>> dataDataStreamSource.keyBy(new MyKeySelector())
>> 
>> 
>> 
>> .map(new RichMapFunction() {
>> 
>> 
>> 
>> 
>> 
>> @Override
>> 
>> 
>> 
>> public String map(data data) throws Exception {
>> 
>> 
>> 
>> System.out.println(data.toString() + "的subtask为:" + 
>> getRuntimeContext().getIndexOfThisSubtask() );
>> 
>> 
>> 
>> return data.toString();
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> });
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> env.execute("test");
>> 
>> 
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> class data{
>> 
>> 
>> 
>> private int id;
>> 
>> 
>> 
>> private String goods;
>> 
>> 
>> 
>> private String pageName;
>> 
>> 
>> 
>> 
>> 
>> public data(int id, String goods, String pageName) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> this.goods = goods;
>> 
>> 
>> 
>> this.pageName = pageName;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> public data() {
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public int getId() {
>> 
>> 
>> 
>> return id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setId(int id) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public String getGoods() {
>> 
>> 
>> 
>> return goods;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setGoods(String goods) {
>> 
>> 
>> 
>> this.goods = goods;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public String getPageName() {
>> 
>> 
>> 
>> return pageName;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setPageName(String pageName) {
>> 
>> 
>> 
>> this.pageName = pageName;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> @Override
>> 
>> 
>> 
>> public String toString() {
>> 
>> 
>> 
>> return "data{" +
>> 
>> 
>> 
>> "id='" + id + '\'' +
>> 
>> 
>> 
>> ", goods='" + goods + '\'' +
>> 
>> 
>> 
>> ", pageName='" + pageName + '\'' +
>> 
>> 
>> 
>> '}';
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> class MyKeySelector implements KeySelector{
>> 
>> 
>> 
>> 
>> 
>> @Override
>> 
>> 
>> 
>> public Integer getKey(data data) throws Exception {
>> 
>> 
>> 
>> return data.getId();
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 控制台的输出如下:
>> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
>> 
>> 
>> 
>> 可以看见数据根据id分组,分到了不同的subtask上。
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 二.使用String类型  代码如下:
>> 
>> 
>> 
>> public class KeyByTest {
>> 
>> 
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>> 
>> env.setParallelism(10);
>> 
>> 
>> 
>> 
>> 
>> DataStreamSource dataDataStreamSource = 
>> env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
>> 
>> 
>> 
>>

Issue Facing While Using EmbeddedRocksDbCheckpointing FlinkVersion(1.15.0)

2022-06-02 Thread harshit.varsh...@iktara.ai
Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.

The errors I am getting

Traceback (most recent call last):

  File
"E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane.
py", line 470, in input_elements

element = received.get(timeout=1)

  File "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py",
line 178, in get

raise Empty

_queue.Empty

RuntimeError: Channel closed prematurely.

My code is:

import json

import os

import time

from datetime import datetime

 

from pyflink.common import SimpleStringSchema, JsonRowDeserializationSchema,
Types, JsonRowSerializationSchema

from pyflink.datastream import StreamExecutionEnvironment, WindowFunction,
HashMapStateBackend, CheckpointingMode, \

FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext,
EmbeddedRocksDBStateBackend, RocksDBStateBackend

from pyflink.datastream.connectors import FlinkKafkaConsumer,
FlinkKafkaProducer

from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
ListStateDescriptor

from sklearn.preprocessing import LabelEncoder

import pickle

import pandas as pd

from pyflink.common import Row

 

import argparse

from typing import Iterable

 

from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy

 

from pyflink.common import Types, WatermarkStrategy, Time, Encoder

from pyflink.common.watermark_strategy import TimestampAssigner

from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction

from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow,
TumblingProcessingTimeWindows

 

 

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:

return int(value[0])

 

 

class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):

def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):

# return [(key, result)]

return [(key, len([e for e in inputs]))]

 

 

class Storage(KeyedProcessFunction):

 

def __init__(self):

self.state = None

 

def open(self, runtime_context: RuntimeContext):

state_descriptor = ValueStateDescriptor("state", Types.FLOAT())

state_ttl_config = StateTtlConfig \

.new_builder(Time.days(7)) \

.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \

.disable_cleanup_in_background() \

.build()

state_descriptor.enable_time_to_live(state_ttl_config)

self.state = runtime_context.get_state(state_descriptor)

 

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):

# retrieve the current count

current = self.state.value()

if current is None:

current = 0

current = value[1]

self.state.update(current)

 

yield current,time.time()

 

 

def write_to_kafka():

env = StreamExecutionEnvironment.get_execution_environment()

env.set_parallelism(1)

env.enable_checkpointing(1000)

env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)

env.set_state_backend(EmbeddedRocksDBStateBackend())

 
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.AT_LEAS
T_ONCE)

#env.get_checkpoint_config().enable_unaligned_checkpoints()

check = os.path.join(os.path.abspath(os.path.dirname(__file__)),

 'checkpoint-dir11')

 
env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStora
ge("file:///{}".format(check)))

kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

 'flink-sql-connector-kafka_2.11-1.14.4.jar')

env.add_jars("file:///{}".format(kafka_jar))

rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

 'flink-statebackend-rocksdb_2.11-1.0.0.jar')

env.add_jars("file:///{}".format(rocksdb_jar))

# deserialization_schema = SimpleStringSchema()

deserialization_schema = JsonRowDeserializationSchema.builder() \

.type_info(type_info=Types.ROW_NAMED(["time_stamp",

  "Bill_number", "Store_Code",
"itemdescription", "Item_code",

  "Gross_Price", "Discount",
"Net_Price",

  "purchaseorReturn",

  "Membership_No",
"Billing_Date", "Billing_Time"],

 [Types.DOUBLE(),
Types.STRING(), Types.INT(), Types.STRING(),

  Types.STRING(), Types.FLOAT(),
Types.FLOAT(), Types.FLOAT(),

  Types.STRING(),
Types.STRING(), Types.STRING(),

   

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-02 Thread Martijn Visser
I believe the change between Flink 1.14 and Flink 1.15 has been the
addition of a RecoverableWriter for GCS [1]

Perhaps this is the reason for this failure?

Best regards, Martijn

[1] https://issues.apache.org/jira/browse/FLINK-11838

Op do 2 jun. 2022 om 12:24 schreef Qingsheng Ren :

> Thanks for the input ChangZhuo.
>
> Could you check if the configuration "classloader.resolve-order” is
> set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
> changes related to the user code classloader in Flink 1.15. If my
> assumption is correct, you package the gcs-connector into your job JAR
> but the Hadoop FS dependencies are not included, so
> org.apache.hadoop.fs.FileSystem is loaded by app classloader from
> flink-s3-fs-hadoop.jar under the lib of Flink, but
> GoogleHadoopFileSystem is loaded by user code classloader from job
> JAR. Setting the resolve order to "parent-first" could bypass the
> issue [1] so I assume you have this config in 1.14 but not in 1.15.
> Please forgive me if I understand incorrectly!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/
>
> On Thu, Jun 2, 2022 at 11:22 AM ChangZhuo Chen (陳昌倬) 
> wrote:
> >
> > On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote:
> > > Hi ChangZhuo,
> > >
> > > I assume it’s a classloading issue but I can’t track down to the root
> cause in code. Would you mind sharing the entire exception stack and some
> JM/TM logs related to file system?
> >
> > The following is exception log we have. Please let us know if you need
> > other logs.
> >
> > ps. .listPath() is the function I mentioned earlier.
> >
> >
> > 2022-06-02 00:25:57,825 WARN
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application failed unexpectedly:
> > java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> > at
> java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
> ~[?:?]
> > at
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
> ~[?:?]
> > at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
> ~[?:?]
> > at
> java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
> > at
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source) ~[?:?]
> > at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323)
> ~[flink-dist-1.15.0.jar:1.15.0]
> > at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
> ~[flink-dist-1.15.0.jar:1.15.0]
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
> > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> > at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> > at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> > at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
> ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> > at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> > at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> > at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
> [?:?]
> > at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
> [?:?]
> > at java.util.concurrent.ForkJoinPool.scan(Unknown Source)
> [?:?]
> > at java.util.concurrent.ForkJoinPool.runWorker(Unknown
> Source) [?:?]
> > at java.util.concurrent.ForkJoinWorkerThread.run(Unknown
> Source) [?:?]
> > Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> > ... 14 more
> > Caused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: class
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to
> class org.apache.hadoop.fs.FileSystem
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and
> org.apache.hadoop.fs.FileSystem are in 

Re:Not able to see std output in console/.out files with table API

2022-06-02 Thread Xuyang
Could you find that the input amount of the node `sink` is being accumulated in 
Flink UI? Is it possible that there are no data actually after join.


If you have checked the above and everything seems ok, you can try again with  
a connector named `print` as the sink table and check whether the out file has 
data.




--

Best!
Xuyang




At 2022-06-02 18:10:28, "Zain Haider Nemati"  wrote:

Hi, 
We are using table apis to integrate and transform data sources and converting 
them to datastream. We want to see the data formatting and adding a .print() 
sink to the datastream but the .out files do not show any output.
We do see records coming in from the metrics in flink UI though. Suggestions on 
where to look at for potential issues?


code:
tEnv.executeSql(“CREATE TABLE orders (\n” +
”id  BIGINT,\n” +
”customer_id BIGINT\n” +
“) WITH (\n” +
”‘connector’ = ‘kafka’,\n” +
”‘topic’ = ‘orders’,\n” +
”‘properties.bootstrap.servers’ = ‘...’,\n” +
”‘scan.startup.mode’ = ‘earliest-offset’,\n” +
”‘format’= ‘json’\n” +
“)”);
Table result = tEnv.sqlQuery(“SELECT o.id AS order_id,\n” +
” dbo.batch_id AS batch_id,\n” +
” o.customer_id AS customer_id,\n” +
” dbo.delivery_priority AS delivery_priority\n” +
” FROM orders o\n” +
” INNER JOIN delivery_batch_orders dbo ON o.id = 
dbo.order_id\n”
);
   
tEnv.toAppendStream(result, StringValue.class).print();
env.execute();


Flink Version : 1.13.1

Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-02 Thread Qingsheng Ren
Thanks for the input ChangZhuo.

Could you check if the configuration "classloader.resolve-order” is
set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
changes related to the user code classloader in Flink 1.15. If my
assumption is correct, you package the gcs-connector into your job JAR
but the Hadoop FS dependencies are not included, so
org.apache.hadoop.fs.FileSystem is loaded by app classloader from
flink-s3-fs-hadoop.jar under the lib of Flink, but
GoogleHadoopFileSystem is loaded by user code classloader from job
JAR. Setting the resolve order to "parent-first" could bypass the
issue [1] so I assume you have this config in 1.14 but not in 1.15.
Please forgive me if I understand incorrectly!

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/

On Thu, Jun 2, 2022 at 11:22 AM ChangZhuo Chen (陳昌倬)  wrote:
>
> On Thu, Jun 02, 2022 at 11:17:19AM +0800, Qingsheng Ren wrote:
> > Hi ChangZhuo,
> >
> > I assume it’s a classloading issue but I can’t track down to the root cause 
> > in code. Would you mind sharing the entire exception stack and some JM/TM 
> > logs related to file system?
>
> The following is exception log we have. Please let us know if you need
> other logs.
>
> ps. .listPath() is the function I mentioned earlier.
>
>
> 2022-06-02 00:25:57,825 WARN  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application failed unexpectedly:
> java.util.concurrent.CompletionException: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) ~[?:?]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) 
> ~[?:?]
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) 
> ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source) ~[?:?]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) 
> ~[?:?]
> at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:323)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:244)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source) ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at 
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
>  ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>  ~[flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) 
> [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>  [flink-rpc-akka_a0bd817c-414f-468f-88c1-4b3d8be7f4c7.jar:1.15.0]
> at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
> at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
> at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) 
> [?:?]
> at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) 
> [?:?]
> Caused by: 
> org.apache.flink.client.deployment.application.ApplicationExecutionException: 
> Could not execute application.
> ... 14 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: 
> The main method caused an error: class 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class 
> org.apache.hadoop.fs.FileSystem 
> (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and 
> org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[flink-dist-1.15.0.jar:1.15.0]
> at 
> 

Not able to see std output in console/.out files with table API

2022-06-02 Thread Zain Haider Nemati
Hi,
We are using table apis to integrate and transform data sources and
converting them to datastream. We want to see the data formatting and
adding a .print() sink to the datastream but the .out files do not show any
output.
We do see records coming in from the metrics in flink UI though.
Suggestions on where to look at for potential issues?

code:
tEnv.executeSql(“CREATE TABLE orders (\n” +
”id  BIGINT,\n” +
”customer_id BIGINT\n” +
“) WITH (\n” +
”‘connector’ = ‘kafka’,\n” +
”‘topic’ = ‘orders’,\n” +
”‘properties.bootstrap.servers’ = ‘...’,\n” +
”‘scan.startup.mode’ = ‘earliest-offset’,\n” +
”‘format’= ‘json’\n” +
“)”);
Table result = tEnv.sqlQuery(“SELECT o.id AS order_id,\n” +
” dbo.batch_id AS batch_id,\n” +
” o.customer_id AS customer_id,\n” +
” dbo.delivery_priority AS delivery_priority\n” +
” FROM orders o\n” +
” INNER JOIN delivery_batch_orders dbo ON o.id =
dbo.order_id\n”
);

tEnv.toAppendStream(result, StringValue.class).print();
env.execute();

Flink Version : 1.13.1


Slow tests on Flink 1.15

2022-06-02 Thread Lasse Nedergaard
Hi. 

Just tried to upgrade from 1.14.2 to 1.15.0. It went well and our jobs runs as 
expected. 

We have a number of test, testing the entire job so we mock input and output 
and start our job with mocked data. After upgrading a simple test now takes 
minutes where it before was less than a minute. 
If I run a test in debug data are processed right a way but the job are stuck 
in park method in the idk.internal.misc.unsafe object I Java 11 and it’s called 
from StreamExecutionEnviroment.execute (job client mini cluster). 

Any idea why this happens and what I’m missing?

Med venlig hilsen / Best regards
Lasse Nedergaard



Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jingsong Li
Thanks Xingtong, Jark, Martijn and Robert for making this possible!

Best,
Jingsong


On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:

> Thank Xingtong for making this possible!
>
> Cheers,
> Jark Wu
>
> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>
> > Hi everyone,
> >
> > I'm very happy to announce that the Apache Flink community has created a
> > dedicated Slack workspace [1]. Welcome to join us on Slack.
> >
> > ## Join the Slack workspace
> >
> > You can join the Slack workspace by either of the following two ways:
> > 1. Click the invitation link posted on the project website [2].
> > 2. Ask anyone who already joined the Slack workspace to invite you.
> >
> > We recommend 2), if available. Due to Slack limitations, the invitation
> > link in 1) expires and needs manual updates after every 100 invites. If
> it
> > is expired, please reach out to the dev / user mailing lists.
> >
> > ## Community rules
> >
> > When using the community Slack workspace, please follow these community
> > rules:
> > * *Be respectful* - This is the most important rule!
> > * All important decisions and conclusions *must be reflected back to the
> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
> happen."
> > - The Apache Mottos [3]
> > * Use *Slack threads* to keep parallel conversations from overwhelming a
> > channel.
> > * Please *do not direct message* people for troubleshooting, Jira
> assigning
> > and PR review. These should be picked-up voluntarily.
> >
> >
> > ## Maintenance
> >
> >
> > Committers can refer to this wiki page [4] for information needed for
> > maintaining the Slack workspace.
> >
> >
> > Thanks Jark, Martijn and Robert for helping setting up the Slack
> workspace.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1] https://apache-flink.slack.com/
> >
> > [2] https://flink.apache.org/community.html#slack
> >
> > [3] http://theapacheway.com/on-list/
> >
> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
> >
>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jingsong Li
Thanks Xingtong, Jark, Martijn and Robert for making this possible!

Best,
Jingsong


On Thu, Jun 2, 2022 at 5:32 PM Jark Wu  wrote:

> Thank Xingtong for making this possible!
>
> Cheers,
> Jark Wu
>
> On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:
>
> > Hi everyone,
> >
> > I'm very happy to announce that the Apache Flink community has created a
> > dedicated Slack workspace [1]. Welcome to join us on Slack.
> >
> > ## Join the Slack workspace
> >
> > You can join the Slack workspace by either of the following two ways:
> > 1. Click the invitation link posted on the project website [2].
> > 2. Ask anyone who already joined the Slack workspace to invite you.
> >
> > We recommend 2), if available. Due to Slack limitations, the invitation
> > link in 1) expires and needs manual updates after every 100 invites. If
> it
> > is expired, please reach out to the dev / user mailing lists.
> >
> > ## Community rules
> >
> > When using the community Slack workspace, please follow these community
> > rules:
> > * *Be respectful* - This is the most important rule!
> > * All important decisions and conclusions *must be reflected back to the
> > mailing lists*. "If it didn’t happen on a mailing list, it didn’t
> happen."
> > - The Apache Mottos [3]
> > * Use *Slack threads* to keep parallel conversations from overwhelming a
> > channel.
> > * Please *do not direct message* people for troubleshooting, Jira
> assigning
> > and PR review. These should be picked-up voluntarily.
> >
> >
> > ## Maintenance
> >
> >
> > Committers can refer to this wiki page [4] for information needed for
> > maintaining the Slack workspace.
> >
> >
> > Thanks Jark, Martijn and Robert for helping setting up the Slack
> workspace.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> > [1] https://apache-flink.slack.com/
> >
> > [2] https://flink.apache.org/community.html#slack
> >
> > [3] http://theapacheway.com/on-list/
> >
> > [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
> >
>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jark Wu
Thank Xingtong for making this possible!

Cheers,
Jark Wu

On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:

> Hi everyone,
>
> I'm very happy to announce that the Apache Flink community has created a
> dedicated Slack workspace [1]. Welcome to join us on Slack.
>
> ## Join the Slack workspace
>
> You can join the Slack workspace by either of the following two ways:
> 1. Click the invitation link posted on the project website [2].
> 2. Ask anyone who already joined the Slack workspace to invite you.
>
> We recommend 2), if available. Due to Slack limitations, the invitation
> link in 1) expires and needs manual updates after every 100 invites. If it
> is expired, please reach out to the dev / user mailing lists.
>
> ## Community rules
>
> When using the community Slack workspace, please follow these community
> rules:
> * *Be respectful* - This is the most important rule!
> * All important decisions and conclusions *must be reflected back to the
> mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
> - The Apache Mottos [3]
> * Use *Slack threads* to keep parallel conversations from overwhelming a
> channel.
> * Please *do not direct message* people for troubleshooting, Jira assigning
> and PR review. These should be picked-up voluntarily.
>
>
> ## Maintenance
>
>
> Committers can refer to this wiki page [4] for information needed for
> maintaining the Slack workspace.
>
>
> Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.
>
>
> Best,
>
> Xintong
>
>
> [1] https://apache-flink.slack.com/
>
> [2] https://flink.apache.org/community.html#slack
>
> [3] http://theapacheway.com/on-list/
>
> [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>


Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Jark Wu
Thank Xingtong for making this possible!

Cheers,
Jark Wu

On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:

> Hi everyone,
>
> I'm very happy to announce that the Apache Flink community has created a
> dedicated Slack workspace [1]. Welcome to join us on Slack.
>
> ## Join the Slack workspace
>
> You can join the Slack workspace by either of the following two ways:
> 1. Click the invitation link posted on the project website [2].
> 2. Ask anyone who already joined the Slack workspace to invite you.
>
> We recommend 2), if available. Due to Slack limitations, the invitation
> link in 1) expires and needs manual updates after every 100 invites. If it
> is expired, please reach out to the dev / user mailing lists.
>
> ## Community rules
>
> When using the community Slack workspace, please follow these community
> rules:
> * *Be respectful* - This is the most important rule!
> * All important decisions and conclusions *must be reflected back to the
> mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
> - The Apache Mottos [3]
> * Use *Slack threads* to keep parallel conversations from overwhelming a
> channel.
> * Please *do not direct message* people for troubleshooting, Jira assigning
> and PR review. These should be picked-up voluntarily.
>
>
> ## Maintenance
>
>
> Committers can refer to this wiki page [4] for information needed for
> maintaining the Slack workspace.
>
>
> Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.
>
>
> Best,
>
> Xintong
>
>
> [1] https://apache-flink.slack.com/
>
> [2] https://flink.apache.org/community.html#slack
>
> [3] http://theapacheway.com/on-list/
>
> [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>


Re: Is there an HA solution to run flink job with multiple source

2022-06-02 Thread Bariša Obradović
Hi,
our use is that the data sources are independent, we are using flink to
ingest data from kafka sources, do a bit of filtering and then write it to
S3.
Since we ingest from multiple kafka sources, and they are independent, we
consider them all optional. Even if 1 just kafka is up and running, we
would like to process it's data.

We use a single flink job, since we find it easier to manage less flink
jobs, and that way we also use less resources

So far, the idea from Xuyang seems doable to me, I'll explore the idea of
subclassing existing Kafka source and making sure that kafka source can
function even if kafka is down.
In the essence, we would like to treat situation of kafka being down, being
the same as if kafka is up, but has no data.
The caveat I can think of, is to add metrics and logs when kafka is down,
so we can still be aware of it, if we need to.

Cheers,
Barisa

On Wed, 1 Jun 2022 at 23:23, Alexander Fedulov 
wrote:

> Hi Bariša,
>
> The way I see it is you either
> - need data from all sources because you are doing some
> conjoint processing. In that case stopping the pipeline is usually the
> right thing to do.
> - the streams consumed from multiple servers are not combined and hence
> could be processed in independent Flink jobs.
> Maybe you could explain where specifically your situation does not fit in
> one of those two scenarios?
>
> Best,
> Alexander Fedulov
>
>
> On Wed, Jun 1, 2022 at 10:57 PM Jing Ge  wrote:
>
>> Hi Bariša,
>>
>> Could you share the reason why your data processing pipeline should keep
>> running when one kafka source is down?
>> It seems like any one among the multiple kafka sources is optional for
>> the data processing logic, because any kafka source could be the one that
>> is down.
>>
>> Best regards,
>> Jing
>>
>> On Wed, Jun 1, 2022 at 5:59 PM Xuyang  wrote:
>>
>>> I think you can try to use a custom source to do that although the one
>>> of the kafka sources is down the operator is also running(just do nothing).
>>> The only trouble is that you need to manage the checkpoint and something
>>> else yourself. But the good news is that you can copy the implementation of
>>> existing kafka source and change a little code conveniently.
>>>
>>> At 2022-06-01 22:38:39, "Bariša Obradović"  wrote:
>>>
>>> Hi,
>>> we are running a flink job with multiple kafka sources connected to
>>> different kafka servers.
>>>
>>> The problem we are facing is when one of the kafka's is down, the flink
>>> job starts restarting.
>>> Is there anyway for flink to pause processing of the kafka which is
>>> down, and yet continue processing from other sources?
>>>
>>> Cheers,
>>> Barisa
>>>
>>>


[ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Xintong Song
Hi everyone,

I'm very happy to announce that the Apache Flink community has created a
dedicated Slack workspace [1]. Welcome to join us on Slack.

## Join the Slack workspace

You can join the Slack workspace by either of the following two ways:
1. Click the invitation link posted on the project website [2].
2. Ask anyone who already joined the Slack workspace to invite you.

We recommend 2), if available. Due to Slack limitations, the invitation
link in 1) expires and needs manual updates after every 100 invites. If it
is expired, please reach out to the dev / user mailing lists.

## Community rules

When using the community Slack workspace, please follow these community
rules:
* *Be respectful* - This is the most important rule!
* All important decisions and conclusions *must be reflected back to the
mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
- The Apache Mottos [3]
* Use *Slack threads* to keep parallel conversations from overwhelming a
channel.
* Please *do not direct message* people for troubleshooting, Jira assigning
and PR review. These should be picked-up voluntarily.


## Maintenance


Committers can refer to this wiki page [4] for information needed for
maintaining the Slack workspace.


Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.


Best,

Xintong


[1] https://apache-flink.slack.com/

[2] https://flink.apache.org/community.html#slack

[3] http://theapacheway.com/on-list/

[4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management


[ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 Thread Xintong Song
Hi everyone,

I'm very happy to announce that the Apache Flink community has created a
dedicated Slack workspace [1]. Welcome to join us on Slack.

## Join the Slack workspace

You can join the Slack workspace by either of the following two ways:
1. Click the invitation link posted on the project website [2].
2. Ask anyone who already joined the Slack workspace to invite you.

We recommend 2), if available. Due to Slack limitations, the invitation
link in 1) expires and needs manual updates after every 100 invites. If it
is expired, please reach out to the dev / user mailing lists.

## Community rules

When using the community Slack workspace, please follow these community
rules:
* *Be respectful* - This is the most important rule!
* All important decisions and conclusions *must be reflected back to the
mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
- The Apache Mottos [3]
* Use *Slack threads* to keep parallel conversations from overwhelming a
channel.
* Please *do not direct message* people for troubleshooting, Jira assigning
and PR review. These should be picked-up voluntarily.


## Maintenance


Committers can refer to this wiki page [4] for information needed for
maintaining the Slack workspace.


Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.


Best,

Xintong


[1] https://apache-flink.slack.com/

[2] https://flink.apache.org/community.html#slack

[3] http://theapacheway.com/on-list/

[4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management


Allow KafkaBuilder to set arbitrary subscribers

2022-06-02 Thread Salva Alcántara
FYI, I have already created the following issue:


- https://issues.apache.org/jira/browse/FLINK-27872


>From the issue description directly:


Currently, KafkaBuilder

has
two setters:

   - setTopics
   

   - setTopicPattern
   


which under the hood instantiate the corresponding (concrete) subscribers.
This covers the most common needs, I agree, but it might fall short in some
cases. Why not add a more generic setter:

   - setKafkaSubscriber (???)

*Otherwise, how can users read from kafka in combination with custom
subscribing logic? Without looking much into it, it seems that they
basically cannot, at least without having to replicate some parts of the
connector, which seems rather inconvenient.*


Re: Re: Flink写入CK数据丢失问题

2022-06-02 Thread lxk7...@163.com

我们目前程序整体都是正常的,没有发生过报错,checkpoint是有开启的。
今天查阅了一下相关资料,发现flink已有的issue跟我这个有点像[FLINK-23721] Flink SQL state TTL has no 
effect when using non-incremental RocksDBStateBackend - ASF JIRA 
(apache.org),主要原因是我在sql里用了group by,设置了ttl,但是ttl在rocksdb状态后端不生效,所以导致管理内存使用率占满。
目前我的解决方案是使用fsstatebackend,现在观察下来管理内存没有任何问题,我会继续关注整体的数据量差异。


lxk7...@163.com
 
发件人: yue ma
发送时间: 2022-06-02 15:05
收件人: user-zh
主题: Re: Flink写入CK数据丢失问题
你好,你可以先看看你们的任务是否开启了 checkpoint  ,以及任务运行的过程中是否发生了 failover
 
lxk  于2022年6月2日周四 11:38写道:
 
> 各位,请教个问题
> 目前使用flink往ck写入数据,使用的是datastream
> api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by
> 求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s.
> 在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。
> 目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。
>
> 还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小很多。但是当我从零点重新消费,目前来看今天的数据能够对上,不知道是否是因为程序运行一段时间后,整个管理内存都被占满了,从而导致原本缓存的数据丢失了。
> 以下是相应的算子链和整个tm内存情况。出现反压是因为从今天0点重新开始消费了。
>
>
>
>


Re: Flink写入CK数据丢失问题

2022-06-02 Thread yue ma
你好,你可以先看看你们的任务是否开启了 checkpoint  ,以及任务运行的过程中是否发生了 failover

lxk  于2022年6月2日周四 11:38写道:

> 各位,请教个问题
> 目前使用flink往ck写入数据,使用的是datastream
> api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by
> 求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s.
> 在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。
> 目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。
>
> 还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小很多。但是当我从零点重新消费,目前来看今天的数据能够对上,不知道是否是因为程序运行一段时间后,整个管理内存都被占满了,从而导致原本缓存的数据丢失了。
> 以下是相应的算子链和整个tm内存情况。出现反压是因为从今天0点重新开始消费了。
>
>
>
>


Re: Help installing apache-flink and numpy to run flink python examples

2022-06-02 Thread Xingbo Huang
Hi Kenneth,

In flink 1.15, pyflink only guarantees support for python 3.6,3.7 and
3.8[1]. In release-1.16, pyflink will provide support for python 3.9[2].

Go back to your installation error. In flink 1.15, the version range of
numpy that pyflink depends on is numpy>=1.14.3,<1.20. So when you execute
`pip install apache-flink`, the version of numpy trying to install is
1.19.5. However, numpy 1.19.5 does not provide the wheel package for python
3.10[3]. Therefore, numpy 1.19.5 will be installed from source code, which
is often difficult to install successfully. For details, you can refer to
the source code installation of numpy[4]. The reason why you successfully
executed `pip install numpy` is that the latest 1.22.4 version of numpy is
installed by default, which provides the wheel package of python 3.10[5].

>From my point of view, your current option solution is to install a python
3.8 virtual environment with conda, and then install Pyflink.

Best,
Xingbo

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/#environment-requirements
[2] https://issues.apache.org/jira/browse/FLINK-27058
[3] https://pypi.org/project/numpy/1.19.5/#files
[4] https://numpy.org/doc/stable/user/building.html
[5] https://pypi.org/project/numpy/1.22.4/#files

Xuyang  于2022年6月2日周四 10:34写道:

> Hi, Kenneth, have you tried the setup file[1] in the Flink-Python? It
> maybe can work.
>
> [1] https://github.com/apache/flink/blob/master/flink-python/setup.py
>
> At 2022-06-02 03:53:36, "Kenneth Shine" 
> wrote:
>
> I am using MacOS Monterey 12.3.1
>
> I have download flink source and examples from GitHub
>
> I have installed python 3.10.4 from
> https://www.python.org/downloads/macos/
>
> In flink/libexec/examples/python/datastream, I run
>
>
>
> /usr/local/bin/python3 word_count.py
>
> But get error
>
>
>
> Traceback (most recent call last):
>
>   File "
> /usr/local/Cellar/apache-flink/1.15.0/libexec/examples/python/datastream/word_count.py",
> line 22, in 
>
> from pyflink.common import WatermarkStrategy, Encoder, Types
>
> ModuleNotFoundError: No module named 'pyflink'
>
>
>
> To install pyflink, I run
>
> /usr/local/bin/python3 -m pip install apache-flink
>
> But get error
>
>   × Encountered error while trying to install package.
>
>   ╰─> numpy
>
>
>
> So I install numpy
>
> /usr/local/bin/python3 -m pip install numpy
>
> Which is successful.
>
>
>
> Yet when I attempt to install pyflink now
>
> /usr/local/bin/python3 -m pip install apache-flink
>
> I get the same error regarding numpy.
>
> How can I get around this error?
>
>
>
>  The error messages from installing pyFlink are quite long.  I have
> attached more of the end of the error message.
>
>
> Thank you for any help you can give me.
>
>
>
> This electronic communication and the information and any files
> transmitted with it, or attached to it, are confidential and are intended
> solely for the use of the individual or entity to whom it is addressed and
> may contain information that is confidential, legally privileged, protected
> by privacy laws, or otherwise restricted from disclosure to anyone else. If
> you are not the intended recipient or the person responsible for delivering
> the e-mail to the intended recipient, you are hereby notified that any use,
> copying, distributing, dissemination, forwarding, printing, or copying of
> this e-mail is strictly prohibited. If you received this e-mail in error,
> please return the e-mail to the sender, delete it from your computer, and
> destroy any printed copy of it.
>
>