Fwd: Can `DataStream`s "fan-in" to a single sink?

2021-09-22 Thread Antony Southworth
Hi

Firstly, apologies if I commit any faux-pas, I have never used a mailing
list before. At least from Googling, reading Flink docs, and searching the
mailing list archives for "fan-in" didn't turn up much so hoping someone
can enlighten me here.

My use-case is similar to the following:

DataStream source = env.addSource(new
FlinkKinesisConsumer("my-kinesis-stream", ...));
   SinkFunction sink = new MyCustomSNSSinkFunction("my-sns-topic",
...);

// "fan-out" to several paths
DataStream job1 = source
.assignTimestampsAndWatermarks(...)
.keyBy(...)
.window(TumblingEventTimeWindows.of(...))
.process(new Job1ProcessFunction());

DataStream job2 = source
.assignTimestampsAndWatermarks(...)
.keyBy(...)
.window(TumblingEventTimeWindows.of(...))
.process(new Job2ProcessFunction());

DataStream job3 = source
.assignTimestampsAndWatermarks(...)
.keyBy(...)
.window(TumblingEventTimeWindows.of(...))
.process(new Job3ProcessFunction());

// part I am curious about
// add the same sink object to each path
job1.addSink(sink);
job2.addSink(sink);
job3.addSink(sink);

env.execute();

Hopefully it's clear, the situation I describe (though I omit a lot of
details so please tell me if it's not so, or if any additional details
could help).

I couldn't find any examples in my Googling or reading the Flink docs of
situations like this, where there are several processing pipelines (`job1`,
`job2`, and `job3` in my example code) all feeding in to the same
`SinkFunction` object. The API docs don't really mention anything about
this case either (again, unless I missed it, in which case please point it
out to me :) ).

The main concern I have is that concurrent calls from the different
pipelines might step on each other in some way; can anyone confirm or deny
that the concern is valid? Should I be using `DataStream.union` first? E.g.
like `job1.union(job2).union(job3).addSink(sink)`?

Appreciate any advice people have :)

---

*Antony Southworth*
Data Engineer - Halter Limited

-- 
_This message (including any attachments) may contain confidential, 
proprietary, privileged and/or private information. The information is 
intended to be for the use of the individual or entity designated above. If 
you are not the intended recipient of this message, please notify the 
sender immediately, and delete the message and any attachments. Any 
disclosure, reproduction, distribution or other use of this message or any 
attachments by an individual or entity other than the intended recipient is 
prohibited. _


Re:re:Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-22 Thread casel.chen
>我司基于最新提供流批一体的接口,实现了mongodb的连接器,支持source和sink,实现了控制批量插入频率、控制缓存批的数据量和mongo文档对象和java对象转换,同时还可选择批量更新,并且使用的mongo最新异步驱动,后期我还会不断优化性能,看大佬能否推动一下,把这个连接器贡献给社区


问一下自己开发的连接器要怎么添加到 https://flink-packages.org/ 网站给大家搜索到?这位朋友能够将你们的连接器贡献上去呢?

















在 2021-09-23 09:32:39,"2572805166" <2572805...@qq.com.INVALID> 写道:
>我司基于最新提供流批一体的接口,实现了mongodb的连接器,支持source和sink,实现了控制批量插入频率、控制缓存批的数据量和mongo文档对象和java对象转换,同时还可选择批量更新,并且使用的mongo最新异步驱动,后期我还会不断优化性能,看大佬能否推动一下,把这个连接器贡献给社区
>
>
>-- 原始邮件 --
>发件人: "Yun Tang";
>发件时间: 2021-09-22 10:55
>收件人: "user-zh@flink.apache.org";
>主题: Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?
>
>
>
>Hi,
>
>其实目前Flink社区并不是那么欢迎新增官方支持的connector,主要原因就是社区的开发人员有限,没有精力维护太多的connector,尤其是一些connector的实现需要一定的相关背景,但很难保证review代码的开发人员具有相关背景,毕竟大家都需要为自己approve的代码负责。
>你可以在 flink-packages [1] 里面找一下,或者考虑自己实现并维护(基础实现应该是复杂度不高的)。
>
>
>[1] https://flink-packages.org/
>
>
>祝好
>唐云
>
>
>From: 黑色 
>Sent: Saturday, September 18, 2021 17:17
>To: user-zh@flink.apache.org 
>Subject: 回复:Flink SQL官方何时能支持redis和mongodb连接器?
>
>这个可以自已定义一个,参考源码写一个,自己写出来的才是自己的,直接用别人的还是别人的
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2021年9月17日(星期五) 下午4:39
>收件人:"user-zh@flink.apache.org"
>主题:Flink SQL官方何时能支持redis和mongodb连接器?
>
>
>
>redis和mongodb经常在工作中用到,但Flink官方一直没有提供这两个标准连接器,想问一下什么时候能正式release方便大家使用呢?
>ps: behair库已经很久没更新了,对应的flink版本太低。


????

2021-09-22 Thread Elaiza


Flink Session 模式Job日志区分

2021-09-22 Thread Ada Luna
多个Job跑在一个Session中,如何区分不同job的日志呢?目前有什么好的办法吗?


Resource leak would happen if exception thrown when flink redisson

2021-09-22 Thread a773807943






I encountered a problem in the process of integrating Flink and Redisson. When the task encounters abnormalities and keeps retries, it will cause the number of Redis Clients to increase volatility (sometimes the number increases, sometimes the number decreases, but the overall trend is growth). Even if I shutdown the Redisson Instance by overwriting the close function , the number of Redis-Clients cannot be prevented from continuing to grow, and eventually the number of Clients will reach the upper limit and an error will be thrown. Moreover, this situation only occurs in the Flink cluster operation mode, and the number of Redis-Clients will remain stable in the local mode. The test code is below. I wonder if you can provide specific reasons and solutions for this situation, thank you.flink version:1.13.2redisson version:3.16.1import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;import java.util.Random;public class ExceptionTest {    public static void main(String[] args) throws Exception{        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);        env.enableCheckpointing(1000 * 60);        DataStream mock = createDataStream(env);        mock.keyBy(x - > 1)                .process(new ExceptionTestFunction())                .uid("batch-query-key-process")                .filter(x- >x!=null)                .print();        env.execute("Exception-Test");    }    private static DataStream createDataStream(StreamExecutionEnvironment env) {        String topic = "test_topic_xhb03";        Properties test = new Properties();        test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");        test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, new SimpleStringSchema(), test);        consumer.setStartFromLatest();        DataStream source = env.addSource(consumer);        return source;    }}import lombok.extern.slf4j.Slf4j;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import org.redisson.Redisson;import org.redisson.api.RedissonRxClient;import org.redisson.config.Config;@Slf4jpublic class ExceptionTestFunction extends KeyedProcessFunction {    private RedissonRxClient redisson;    @Override    public void close() {        this.redisson.shutdown();        log.info(String.format("Shut down redisson instance in close method, RedissonRxClient shutdown is %s", redisson.isShutdown()));    }        @Override    public void open(Configuration parameters) {        String prefix = "redis://";        Config config = new Config();                        config.useSingleServer()                                .setClientName("xhb-redisson-main")                                .setTimeout(5000)                                .setConnectTimeout(1)                                .setConnectionPoolSize(4)                                .setConnectionMinimumIdleSize(2)                                .setIdleConnectionTimeout(1)                                .setAddress("127.0.0.1:6379")                                .setDatabase(0)                                .setPassword(null);        this.redisson = Redisson.create(config).rxJava();    }    @Override    public void processElement(String value, Context ctx, Collector out) throws Exception {        throw new NullPointerException("Null Pointer in ProcessElement");    }}





re:Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-22 Thread 2572805166
我司基于最新提供流批一体的接口,实现了mongodb的连接器,支持source和sink,实现了控制批量插入频率、控制缓存批的数据量和mongo文档对象和java对象转换,同时还可选择批量更新,并且使用的mongo最新异步驱动,后期我还会不断优化性能,看大佬能否推动一下,把这个连接器贡献给社区


-- 原始邮件 --
发件人: "Yun Tang";
发件时间: 2021-09-22 10:55
收件人: "user-zh@flink.apache.org";
主题: Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?



Hi,

其实目前Flink社区并不是那么欢迎新增官方支持的connector,主要原因就是社区的开发人员有限,没有精力维护太多的connector,尤其是一些connector的实现需要一定的相关背景,但很难保证review代码的开发人员具有相关背景,毕竟大家都需要为自己approve的代码负责。
你可以在 flink-packages [1] 里面找一下,或者考虑自己实现并维护(基础实现应该是复杂度不高的)。


[1] https://flink-packages.org/


祝好
唐云


From: 黑色 
Sent: Saturday, September 18, 2021 17:17
To: user-zh@flink.apache.org 
Subject: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

这个可以自已定义一个,参考源码写一个,自己写出来的才是自己的,直接用别人的还是别人的




--原始邮件--
发件人:
"user-zh"   
 


??????re:?????? flink sql????????????????sink table?

2021-09-22 Thread ??????
sql??sql??

iPhone


--  --
??: 2572805166 <2572805...@qq.com.INVALID
: 2021??9??23?? 09:23
??: user-zh https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like
 casel.chen ??2021??9??18?? 8:27?? kafka 
topic??topic)
 ??flink sqlsink table

??????re:?????? flink sql????????????????sink table?

2021-09-22 Thread ??????
sql??sql



iPhone


--  --
??: 2572805166 <2572805...@qq.com.INVALID
: 2021923 09:23
??: user-zh 

re:回复: flink sql是否支持动态创建sink table?

2021-09-22 Thread 2572805166
使用java的动态编译和类加载技术,实现类似于web项目的热加载


-- 原始邮件 --
发件人: "JasonLee"<17610775...@163.com;
发件时间: 2021-09-22 22:33
收件人: "user-zh@flink.apache.org";
主题: 回复: flink sql是否支持动态创建sink table?



hi 事实上这个跟构建 graph 没有太大的关系 也不用在构建后调整 在构造 producer 的时候 topic 不要写死 自定义 
KafkaSerializationSchema 序列化类 topic 动态的从数据中获取就行了 Best JasonLee 在2021年9月22日 
19:48,spoon_lz 写道: “在 datastream api 任务是可以的” 
这样是可行的吗,我的理解flink是要先构建好graph之后才能运行,graph构建好之后可能没办法再动态调整了,除非写一个自定义的sink,自己实现逻辑 
在2021年09月22日 19:25,JasonLee<17610775...@163.com 写道: hi 这个我理解在 SQL 
任务里面目前是没办法做到的 在 datastream api 任务是可以的 Best JasonLee 在2021年9月22日 11:35,酷酷的浑蛋 写道: 
我也有这个需求,意思就是topic里实时新增了一种日志,然后想动态创建对应新的日志的topic表,并写入到新的topic表,在一个任务中完成 | | 
apache22 | | apach...@163.com | 签名由网易邮箱大师定制 在2021年09月22日 11:23,Caizhi Weng 写道: 
Hi! 不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。 如果是因为各 topic 之间的 schema 
重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like
 casel.chen  于2021年9月18日周六 上午8:27写道: 上游kafka 
topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink sql支持动态创建sink 
table吗?

Kafka Partition Discovery

2021-09-22 Thread Mason Chen
Hi all,

We are sometimes facing a connection issue with Kafka when a broker restarts

```
java.lang.RuntimeException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:846)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired
while fetching topic metadata
```

Can a retry be added to the partition discovery mechanism?

Best,
Mason


Re: Not able to avoid Dynamic Class Loading

2021-09-22 Thread Kevin Lam
Sorry for the late reply here, I'm just returning to this now.

Interesting re: the avro version, we're using 1.10.0 in our application
jar. But maybe this is somehow being clobbered when we try to move it into
/lib vs. /usrlib to avoid dynamic class loading. Is it possible that's
happening?

On Fri, Aug 27, 2021 at 2:28 PM Arvid Heise  wrote:

> I guess the best option is to attach a debugger and set a breakpoint at
> the NotSerializableException. There definitively has to be a
> non-serializable component in that FlinkKafkaConsumer and it can only come
> from the DeserializationSchema or Properties.
> Maybe the consumer internally caches some values generated by your schema
> at some point but I couldn't think anything obvious. There is a high chance
> that it comes by your code and only activates on cluster.
> It would be nice to hear back from you when you have found that respective
> field. It should be 2 object references deep in FlinkKafkaConsumer (2
> writeObject0 before the first writeArray that most likely corresponds to
> your RecordSchema)
>
> Btw which Avro version are you using? It looks like Avro 1.10.X finally
> has serializable schema... Maybe this might also explain why it works in
> one submission and not in the other?
>
> On Fri, Aug 27, 2021 at 4:10 PM Kevin Lam  wrote:
>
>> There's no inner classes, and none of the fields
>> of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when
>> expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed
>> of Strings and Booleans.
>>
>> DebeziumAvroRegistryDeserializationSchema has a field that initializes a
>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient but
>> this is marked @transient and lazy in Scala, similarly the deserializer
>> uses that client to initialize a transient+lazy field which builds a
>> KafkaAvroDeserializer
>>
>>>


Re: Support ARM architecture

2021-09-22 Thread Robert Metzger
Hi,

afaik the only real blocker for ARM support was a rocksdb binary for arm.
This has been resolved and is scheduled to be released with 1.14.0:
https://issues.apache.org/jira/browse/FLINK-13598

If you have an ARM machine available, you could even help the community in
the release verification process and test the current release candidate:
https://lists.apache.org/thread.html/r21b181aff057d17c81839190e4e413574e07f36eeb8e59799392de2e%40%3Cdev.flink.apache.org%3E


On Wed, Sep 22, 2021 at 7:37 PM Patrick Angeles 
wrote:

> Hey all,
>
> Trying to follow FLINK-13448. Seems like all the subtasks, save for one on
> documentation, are completed... does this mean there will be an arm64
> binary available in the next release (1.14)?
>


Re: Many S3V4AuthErrorRetryStrategy warn logs while reading/writing from S3

2021-09-22 Thread Robert Metzger
Hey Andreas,

This could be related too
https://github.com/apache/hadoop/pull/110/files#diff-0a2e55a2f79ea4079eb7b77b0dc3ee562b383076fa0ac168894d50c80a95131dR950

I guess in Flink this would be

s3.endpoint: your-endpoint-hostname

Where your-endpoint-hostname is a region-specific endpoint, which you can
probably look up from the S3 docs.


On Wed, Sep 22, 2021 at 7:07 PM Hailu, Andreas  wrote:

> Hi,
>
>
>
> When reading/writing to and from S3 using the flink-fs-s3-hadoop plugin on
> 1.11.2, we observe a lot of these WARN log statements in the logs:
>
>
>
> *WARN  S3V4AuthErrorRetryStrategy - Attempting to re-send the request to
> s3.amazonaws.com  with AWS V4 authentication. To
> avoid this warning in the future, please use region-specific endpoint to
> access buckets located in regions that require V4 signing.*
>
>
>
> The applications complete successfully which is great, but I’m not sure
> what the root of the error is and I’m hesitant to silence it through our
> logging configurations. I saw something that looks similar here[1]. Is
> there a way for us to similarly have Flink’s AWS S3 client to use V4
> strategy to begin with?
>
>
>
> [1]
> https://stackoverflow.com/questions/39513518/aws-emr-writing-to-kms-encrypted-s3-parquet-files
>
>
>
> 
>
>
>
> *Andreas Hailu*
>
> *Data Lake Engineering *| Goldman Sachs & Co.
>
>
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


Re: flink rest endpoint creation failure

2021-09-22 Thread Robert Metzger
Hi,

Yes, "rest.bind-port" seems to be set to "35485" on the JobManager
instance. Can you double check the configuration that is used by Flink?
The jobManager is also printing the effective configuration on start up.
You'll probably see the value there as well.


On Wed, Sep 22, 2021 at 6:48 PM Curt Buechter  wrote:

> Hi,
> I'm getting an error that happens randomly when starting a flink
> application.
>
> For context, this is running in YARN on AWS. This application is one that
> converts from the Table API to the Stream API, so two flink
> applications/jobmanagers are trying to start up. I think what happens is
> that the rest api port is chosen, and is the same for both of the flink
> apps. If YARN chooses two different instances for the two task managers,
> they each work fine and start their rest api on the same port on their own
> respective machine. But, if YARN chooses the same instance for both job
> managers, they both try to start up the rest api on the same port on the
> same machine, and I get the error.
>
> Here is the error:
>
> 2021-09-22 15:47:27,724 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not 
> start cluster entrypoint YarnJobClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
>  [flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
>  [flink-dist_2.12-1.13.2.jar:1.13.2]
> Caused by: org.apache.flink.util.FlinkException: Could not create the 
> DispatcherResourceManagerComponent.
>   at 
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_282]
>   at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>  ~[hadoop-common-3.2.1-amzn-3.jar:?]
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   ... 2 more
> Caused by: java.net.BindException: Could not start rest endpoint on any port 
> in port range 35485
>   at 
> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_282]
>   at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>  ~[hadoop-common-3.2.1-amzn-3.jar:?]
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   ... 2 more
>
>
> And, here is part of the log from the other job manager, which successfully 
> started its rest api on the same port, just a few seconds earlier:
>
>
> 2021-09-22 15:47:20,690 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Rest 
> endpoint listening at ip-10-1-2-137.ec2.internal:35485
> 2021-09-22 15:47:20,691 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
> http://ip-10-1-2-137.ec2.internal:35485 was granted leadership with 
> 

Support ARM architecture

2021-09-22 Thread Patrick Angeles
Hey all,

Trying to follow FLINK-13448. Seems like all the subtasks, save for one on
documentation, are completed... does this mean there will be an arm64
binary available in the next release (1.14)?


Many S3V4AuthErrorRetryStrategy warn logs while reading/writing from S3

2021-09-22 Thread Hailu, Andreas
Hi,

When reading/writing to and from S3 using the flink-fs-s3-hadoop plugin on 
1.11.2, we observe a lot of these WARN log statements in the logs:

WARN  S3V4AuthErrorRetryStrategy - Attempting to re-send the request to 
s3.amazonaws.com with AWS V4 authentication. To avoid this warning in the 
future, please use region-specific endpoint to access buckets located in 
regions that require V4 signing.

The applications complete successfully which is great, but I'm not sure what 
the root of the error is and I'm hesitant to silence it through our logging 
configurations. I saw something that looks similar here[1]. Is there a way for 
us to similarly have Flink's AWS S3 client to use V4 strategy to begin with?

[1] 
https://stackoverflow.com/questions/39513518/aws-emr-writing-to-kms-encrypted-s3-parquet-files



Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


flink rest endpoint creation failure

2021-09-22 Thread Curt Buechter
Hi,
I'm getting an error that happens randomly when starting a flink
application.

For context, this is running in YARN on AWS. This application is one that
converts from the Table API to the Stream API, so two flink
applications/jobmanagers are trying to start up. I think what happens is
that the rest api port is chosen, and is the same for both of the flink
apps. If YARN chooses two different instances for the two task managers,
they each work fine and start their rest api on the same port on their own
respective machine. But, if YARN chooses the same instance for both job
managers, they both try to start up the rest api on the same port on the
same machine, and I get the error.

Here is the error:

2021-09-22 15:47:27,724 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed
to initialize the cluster entrypoint YarnJobClusterEntrypoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.util.FlinkException: Could not create the
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_282]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
~[hadoop-common-3.2.1-amzn-3.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on
any port in port range 35485
at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_282]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
~[hadoop-common-3.2.1-amzn-3.jar:?]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
... 2 more


And, here is part of the log from the other job manager, which
successfully started its rest api on the same port, just a few seconds
earlier:


2021-09-22 15:47:20,690 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
Rest endpoint listening at ip-10-1-2-137.ec2.internal:35485
2021-09-22 15:47:20,691 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
http://ip-10-1-2-137.ec2.internal:35485 was granted leadership with
leaderSessionID=----
2021-09-22 15:47:20,692 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Web
frontend listening at http://ip-10-1-2-137.ec2.internal:35485.



Do you know of any configuration that would assist with this? I
thought about rest.bind-port, but the rest port already seems to be
chosen dynamically. My config file has that setting commented out.


Thanks


pyflink keyed stream checkpoint error

2021-09-22 Thread Curt Buechter
Hi,
I'm getting an error after enabling checkpointing in my pyflink application
that uses a keyed stream and rocksdb state.

Here is the error message:

2021-09-22 16:18:14,408 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
Closed RocksDB State Backend. Cleaning up RocksDB working directory
/mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] -
KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831)
switched from RUNNING to FAILED with failure cause: java.io.IOException:
Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed
(1/1)#34.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
... 19 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
CANCELLED: cancelled before receiving half close
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at

Re: Unbounded Kafka Source

2021-09-22 Thread Robert Cullen
Robert,

So removing the setUnbounded(OffsetInitializer.latest) fixed the issue.
Thanks!

On Wed, Sep 22, 2021 at 11:51 AM Robert Metzger  wrote:

> Hi,
>
> What happens if you do not set any boundedness on the KafkaSource?
> For a DataStream job in streaming mode, the Kafka source should be
> unbounded.
>
> From reading the code, it seems that setting unbounded(latest) should not
> trigger the behavior you mention ... but the Flink docs are not clearly
> written [1], as it says that you can make a Kafka source bounded by calling
> "setUnbounded" ... which is weird, because "setUnbounded" should not make
> something bounded?!
>
> Are there any log messages from the Source that can give us any hints?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness
>
> On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen 
> wrote:
>
>> I have an unbounded kafka source that has records written to it every
>> second.  Instead of the job waiting to process the new messages it closes.
>> How do I keep the stream open?
>>
>> KafkaSource dataSource = KafkaSource
>> .builder()
>> .setBootstrapServers(kafkaServer)
>> .setTopics(Arrays.asList("fluentd"))
>> .setGroupId("")
>> .setDeserializer(new FluentdRecordDeserializer())
>> //.setStartingOffsets(OffsetsInitializer.earliest())
>> //.setBounded(OffsetsInitializer.latest())
>> .setUnbounded(OffsetsInitializer.latest())
>> .build();
>>
>>
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490


RE: Unbounded Kafka Source

2021-09-22 Thread Schwalbe Matthias
Hi,

If I remember right, this is actually the intended behaviour:

In batch mode: .setBounded(…)
In streaming mode: source that finishes anyway at set offset: use 
.setUnbounded(…)
In streaming mode: source that never finishes: don’t set a final offset (don’t 
.setUnbounded(…))

I might be mistaken …

Thias


From: Robert Metzger 
Sent: Mittwoch, 22. September 2021 17:51
To: Robert Cullen 
Cc: user 
Subject: Re: Unbounded Kafka Source

Hi,

What happens if you do not set any boundedness on the KafkaSource?
For a DataStream job in streaming mode, the Kafka source should be unbounded.

From reading the code, it seems that setting unbounded(latest) should not 
trigger the behavior you mention ... but the Flink docs are not clearly written 
[1], as it says that you can make a Kafka source bounded by calling 
"setUnbounded" ... which is weird, because "setUnbounded" should not make 
something bounded?!

Are there any log messages from the Source that can give us any hints?

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness

On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen 
mailto:cinquate...@gmail.com>> wrote:
I have an unbounded kafka source that has records written to it every second.  
Instead of the job waiting to process the new messages it closes.  How do I 
keep the stream open?


KafkaSource dataSource = KafkaSource
.builder()
.setBootstrapServers(kafkaServer)
.setTopics(Arrays.asList("fluentd"))
.setGroupId("")
.setDeserializer(new FluentdRecordDeserializer())
//.setStartingOffsets(OffsetsInitializer.earliest())
//.setBounded(OffsetsInitializer.latest())
.setUnbounded(OffsetsInitializer.latest())
.build();



--
Robert Cullen
240-475-4490
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Unbounded Kafka Source

2021-09-22 Thread Robert Metzger
Hi,

What happens if you do not set any boundedness on the KafkaSource?
For a DataStream job in streaming mode, the Kafka source should be
unbounded.

>From reading the code, it seems that setting unbounded(latest) should not
trigger the behavior you mention ... but the Flink docs are not clearly
written [1], as it says that you can make a Kafka source bounded by calling
"setUnbounded" ... which is weird, because "setUnbounded" should not make
something bounded?!

Are there any log messages from the Source that can give us any hints?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness

On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen  wrote:

> I have an unbounded kafka source that has records written to it every
> second.  Instead of the job waiting to process the new messages it closes.
> How do I keep the stream open?
>
> KafkaSource dataSource = KafkaSource
> .builder()
> .setBootstrapServers(kafkaServer)
> .setTopics(Arrays.asList("fluentd"))
> .setGroupId("")
> .setDeserializer(new FluentdRecordDeserializer())
> //.setStartingOffsets(OffsetsInitializer.earliest())
> //.setBounded(OffsetsInitializer.latest())
> .setUnbounded(OffsetsInitializer.latest())
> .build();
>
>
>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Flink Performance Issue

2021-09-22 Thread Robert Metzger
Hi Kamaal,

I would first suggest understanding the performance bottleneck, before
applying any optimizations.

Idea 1: Are your CPUs fully utilized?
if yes, good, then scaling up will probably help
If not, then there's another inefficiency

Idea 2: How fast can you get the data into your job, without any processing?
You can measure this by submitting a simple Flink job that just reads the
data and writes it to a discarding sink. Either disable the operator
chaining to get metrics for the records per second, or add a custom mapper
in between that measures the throughput.
Ideally you see here that you can read all your data in a few seconds, if
not, then there's a problem getting your data in.

Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
the disk can dramatically slow you down)
Idea 4: Are you under high memory pressure, and your JVMs are spending most
of their cycles garbage collecting?

My bet is you are not getting data into your cluster as fast as you think
(Idea 2)


On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
mohammed.kamaa...@gmail.com> wrote:

> Hi Arvid,
>
> The throughput has decreased further after I removed all the rebalance().
> The performance has decreased from 14 minutes for 20K messages to 20
> minutes for 20K messages.
>
> Below are the tasks that the flink application is performing. I am using
> keyBy and Window operation. Do you think am I making any mistake here or
> the way I am performing the keyBy or Window operation needs to be
> corrected?.
>
> //Add Source
> StreamExecutionEnvironment streamenv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> initialStreamData = streamenv.addSource(new
> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
> new *ObjectNodeJsonDeSerializerSchema()*,
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> DataStream cgmStreamData = initialStreamData.keyBy(value ->
> value.findValue("PERSON_ID").asText())
> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>
> DataStream artfctOverlapStream = cgmStreamData.keyBy(new
> CGMKeySelector()).countWindow(2, 1)
> .apply(new *ArtifactOverlapProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new
> CGMKeySelector()).countWindow(7, 1)
> .apply(new *SgRocProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream cgmExcursionStream =
> streamWithSgRoc.keyBy(new CGMKeySelector())
> .countWindow(Common.THREE, Common.ONE).apply(new
> *CGMExcursionProviderStream()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> //Add Sink
> cgmExcursionStream.addSink(new FlinkKafkaProducer(
> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
> CGMDataCollectorSchema(),
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> *Implementation classes:-*
>
> //deserialize the json message received
> *ObjectNodeJsonDeSerializerSchema* implements
> KeyedDeserializationSchema{
> public ObjectNode deserialize(byte[] messageKey, byte[] message, String
> topic, int partition, long offset);
> }
>
> //Flapmap to check each message and apply validation
> public class *SgStreamingTask* extends RichFlatMapFunction CGM> {
> void flatMap(ObjectNode streamData, Collector out);
> }
>
> //persist three state variables and apply business logic
> public class *ArtifactOverlapProvider* extends RichFlatMapFunction Tuple2>
> implements WindowFunction {
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
> }
>
> //Apply business logic
> public class *SgRocProvider* implements WindowFunction GlobalWindow>{
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
> }
>
> //persist 3 state variables and apply business logic
> public class *CGMExcursionProviderStream* extends
> RichFlatMapFunction>
> implements WindowFunction{
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
>
> }
>
> Thanks
> Kamaal
>
>
> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise  wrote:
>
>> Hi Mohammed,
>>
>> something is definitely wrong in your setup. You can safely say that you
>> can process 1k records per second and core with Kafka and light processing,
>> so you shouldn't even need to go distributed in your case.
>>
>> Do you perform any heavy computation? What is your flatMap doing? Are you
>> emitting lots of small records from one big record?
>>
>> Can you please remove all rebalance and report back? Rebalance is
>> counter-productive if you don't exactly know that you need it.
>>
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <
>> mohammed.kamaa...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> Just an update,
>>>
>>> Problem 2:-
>>> 
>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>> It is resolved. It was because we exceeded the number of allowed
>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>> unused topics 

Re: Observability tools on top of Flink

2021-09-22 Thread Deepak Sharma
I am interested in learning this fact as well as I need to put in
observability in a flink pipeline.

On Wed, 22 Sep 2021 at 8:40 PM, Dan Hill  wrote:

> Hi!
>
> I saw a recent Medium article
> 
> by Pinterest on a diagnostic tool called DrSquirrel.  Looks great.
>
> What do other teams do for similar problems?
>
> E.g. Has anyone piped their Flink metadata into Apache Thirdeye?
>
> - Dan
>
-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Can't access Debezium metadata fields in Kafka table

2021-09-22 Thread Harshvardhan Shinde
Hi,
I'm trying to access the metadata columns from the debezium source
connector as documented here

.
However I'm getting the following error when I try to select the rows from
the kafka table:

flink.table.api.ValidationException: Invalid metadata key
'value.ingestion-timestamp' in column 'origin_ts'

Getting the same issue for all the *virtual* columns. Please let me know
what I'm doing wrong.

Here's my table creation query:

CREATE TABLE testFlink (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP METADATA FROM
'value.source.properties' VIRTUAL,
  id BIGINT,
  number BIGINT,
  created_at BIGINT,
  updated_at BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source-staging-postgres-flink_test-82-2021-09-20.public.test',
  'properties.bootstrap.servers' = ':9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-avro-confluent',
  'value.debezium-avro-confluent.schema-registry.url' =
':8081'
);

Thanks.


Observability tools on top of Flink

2021-09-22 Thread Dan Hill
Hi!

I saw a recent Medium article

by Pinterest on a diagnostic tool called DrSquirrel.  Looks great.

What do other teams do for similar problems?

E.g. Has anyone piped their Flink metadata into Apache Thirdeye?

- Dan


????

2021-09-22 Thread ??????????


回复: flink sql是否支持动态创建sink table?

2021-09-22 Thread spoon_lz


“在 datastream api 任务是可以的” 
这样是可行的吗,我的理解flink是要先构建好graph之后才能运行,graph构建好之后可能没办法再动态调整了,除非写一个自定义的sink,自己实现逻辑


在2021年09月22日 19:25,JasonLee<17610775...@163.com> 写道:
hi


这个我理解在 SQL 任务里面目前是没办法做到的 在 datastream api 任务是可以的


Best
JasonLee
在2021年9月22日 11:35,酷酷的浑蛋 写道:
我也有这个需求,意思就是topic里实时新增了一种日志,然后想动态创建对应新的日志的topic表,并写入到新的topic表,在一个任务中完成


| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制


在2021年09月22日 11:23,Caizhi Weng 写道:
Hi!

不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。

如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like

casel.chen  于2021年9月18日周六 上午8:27写道:

上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink
sql支持动态创建sink table吗?


回复: flink sql是否支持动态创建sink table?

2021-09-22 Thread JasonLee
hi


这个我理解在 SQL 任务里面目前是没办法做到的 在 datastream api 任务是可以的


Best
JasonLee
在2021年9月22日 11:35,酷酷的浑蛋 写道:
我也有这个需求,意思就是topic里实时新增了一种日志,然后想动态创建对应新的日志的topic表,并写入到新的topic表,在一个任务中完成


| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制


在2021年09月22日 11:23,Caizhi Weng 写道:
Hi!

不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。

如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like

casel.chen  于2021年9月18日周六 上午8:27写道:

上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink
sql支持动态创建sink table吗?


回复:flink消费kafka分区消息不均衡问题

2021-09-22 Thread JasonLee
hi 


图片看不到 我猜大概有两种情况 第一种是你的 source 本身就存在数据倾斜 某几个分区的数据量比其他分区的多 需要修改数据写入 kafka 
分区策略让数据尽量均匀 第二种是你的下游计算的时候出现数据倾斜(或其他原因)导致任务反压到 source 端 这种情况需要根据实际的情况采用不同的解决方案 
单纯的增加并发和改变 slot 数量没有什么效果


Best
JasonLee
在2021年9月22日 09:22,casel.chen 写道:
kafka topic有32个分区,实时作业开了32个并行度消费kafka 
topic,现在监控发现部分分区消息积压严重(如下图所示),请问会有哪些原因造成的?有什么解决办法吗?扩大分区数是不是也不能根治这种情况?
PS: 
每个分区消息数的确有所不均,但是同样消息数的几个分区也会出现积压不同情况(如15,16,17,18)。会是因为节点带宽受限造成的吗?当前numberOfSlots=8,改成numberOfSlots=1会有效果么?


|
分区 ID
|
客户端
|
最大位点
|
消费位点
|
堆积量
|
|
0
|
n/a
|
14,131,397
|
14,130,923
|
474
|
|
1
|
n/a
|
14,191,455
|
14,189,396
|
2,059
|
|
2
|
n/a
|
14,611,826
|
14,610,262
|
1,564
|
|
3
|
n/a
|
15,340,150
|
15,335,944
|
4,206
|
|
4
|
n/a
|
16,379,487
|
16,372,237
|
7,250
|
|
5
|
n/a
|
17,696,565
|
17,639,308
|
57,257
|
|
6
|
n/a
|
19,200,829
|
19,129,856
|
70,973
|
|
7
|
n/a
|
20,889,954
|
20,888,652
|
1,302
|
|
8
|
n/a
|
22,643,539
|
22,536,468
|
107,071
|
|
9
|
n/a
|
24,440,881
|
24,439,357
|
1,524
|
|
10
|
n/a
|
26,178,250
|
26,073,197
|
105,053
|
|
11
|
n/a
|
27,828,497
|
27,670,732
|
157,765
|
|
12
|
n/a
|
29,284,463
|
29,283,105
|
1,358
|
|
13
|
n/a
|
30,526,020
|
29,781,704
|
744,316
|
|
14
|
n/a
|
31,468,482
|
31,467,243
|
1,239
|
|
15
|
n/a
|
32,084,198
|
31,467,610
|
616,588
|
|
16
|
n/a
|
32,393,752
|
32,019,836
|
373,916
|
|
17
|
n/a
|
32,302,065
|
32,141,999
|
160,066
|
|
18
|
n/a
|
31,875,063
|
31,874,452
|
611
|
|
19
|
n/a
|
31,137,894
|
31,002,867
|
135,027
|
|
20
|
n/a
|
30,098,926
|
29,930,855
|
168,071
|
|
21
|
n/a
|
28,739,235
|
28,603,509
|
135,726
|
|
22
|
n/a
|
27,221,026
|
27,220,821
|
205
|
|
23
|
n/a
|
25,514,265
|
25,382,536
|
131,729
|
|
24
|
n/a
|
23,779,714
|
23,689,296
|
90,418
|
|
25
|
n/a
|
21,981,307
|
21,981,267
|
40
|
|
26
|
n/a
|
20,237,925
|
20,223,880
|
14,045
|
|
27
|
n/a
|
18,606,490
|
18,606,387
|
103
|
|
28
|
n/a
|
17,178,098
|
17,177,971
|
127
|
|
29
|
n/a
|
15,972,292
|
15,972,105
|
187
|
|
30
|
n/a
|
15,032,355
|
15,032,138
|
217
|
|
31
|
n/a
|
14,426,366
|
14,425,462
|
904
|

Re: S3 access permission error

2021-09-22 Thread Harshvardhan Shinde
Hi,
I was facing the same issue, the best way to solve this is to use the IAM
role (which is the recommended way) instead of the access keys.
Hope this helps.

On Wed, Sep 22, 2021 at 1:32 PM Yangze Guo  wrote:

> I'm not an expert on S3. If it is not a credential issue, have you
> finish the checklist of this doc[1]?
>
> [1]
> https://aws.amazon.com/premiumsupport/knowledge-center/emr-s3-403-access-denied/?nc1=h_ls
>
> Best,
> Yangze Guo
>
> On Wed, Sep 22, 2021 at 3:39 PM Dhiru  wrote:
> >
> >
> > Not sure @yangze ...  but other services which are deployed in same
> places we are able to access s3 bucket, the link which you share are
> recommended way, if we have access to s3 then we should not pass
> credentials ?
> >
> > On Wednesday, September 22, 2021, 02:59:05 AM EDT, Yangze Guo <
> karma...@gmail.com> wrote:
> >
> >
> > You might need to configure the access credential. [1]
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Sep 22, 2021 at 2:17 PM Dhiru  wrote:
> > >
> > >
> > > i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326)
> plugin is not able to create folder , not sure if I need to change something
> > > Whereas when We are trying to pass from the local laptop and passing
> aws credentails its able to create a folder and running as expected
> > > On Wednesday, September 22, 2021, 01:39:04 AM EDT, Dhiru <
> userdh...@yahoo.com> wrote:
> > >
> > >
> > > flink image I have added both s3 plugin
> > > FROM flink:1.11.3-scala_2.12-java11
> > > RUN mkdir ./plugins/flink-s3-fs-presto
> > > RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar
> ./plugins/flink-s3-fs-presto/
> > > RUN mkdir ./plugins/flink-s3-fs-hadoop
> > > RUN cp ./opt/flink-s3-fs-hadoop-1.11.3.jar
> ./plugins/flink-s3-fs-hadoop/
> > >
> > > some part of flink-conf.yaml  ( I tried with both s3a and s3  )
> > ># REQUIRED: set storage location for job metadata in remote storage
> > >  state.backend: filesystem
> > >  state.backend.fs.checkpointdir:
> s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints
> > >  state.checkpoints.dir:
> s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints
> > >  state.savepoints.dir:
> s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints
> > >  high-availability.storageDir:
> s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir
> > >  s3.path.style.access: true
> > >
> > > org.apache.flink.runtime.rest.handler.RestHandlerException: Could not
> execute application. at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
> at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown
> Source) at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source) at
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source) at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
> Source) at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by:
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not execute application.
> at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException:
> Could not execute application. at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
> ... 7 more Caused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute job 'DeduplicationJob'. at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
> ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to
> execute job 

Re: Flink Performance Issue

2021-09-22 Thread Mohammed Kamaal
Hi Arvid,

The throughput has decreased further after I removed all the rebalance(). The 
performance has decreased from 14 minutes for 20K messages to 20 minutes for 
20K messages.

Below are the tasks that the flink application is performing. I am using keyBy 
and Window operation. Do you think am I making any mistake here or the way I am 
performing the keyBy or Window operation needs to be corrected?.

//Add Source
StreamExecutionEnvironment streamenv = 
StreamExecutionEnvironment.getExecutionEnvironment();
initialStreamData = streamenv.addSource(new 
FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
new ObjectNodeJsonDeSerializerSchema(), 
kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

DataStream cgmStreamData = initialStreamData.keyBy(value -> 
value.findValue("PERSON_ID").asText())
.flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);

DataStream artfctOverlapStream = cgmStreamData.keyBy(new 
CGMKeySelector()).countWindow(2, 1)
.apply(new 
ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new 
CGMKeySelector()).countWindow(7, 1)
.apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream cgmExcursionStream = streamWithSgRoc.keyBy(new 
CGMKeySelector())
.countWindow(Common.THREE, Common.ONE).apply(new 
CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();

//Add Sink
cgmExcursionStream.addSink(new FlinkKafkaProducer(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
CGMDataCollectorSchema(),
kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

Implementation classes:-

//deserialize the json message received
ObjectNodeJsonDeSerializerSchema implements 
KeyedDeserializationSchema{
public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset);
}

//Flapmap to check each message and apply validation
public class SgStreamingTask extends RichFlatMapFunction {
void flatMap(ObjectNode streamData, Collector out);
}

//persist three state variables and apply business logic
public class ArtifactOverlapProvider extends RichFlatMapFunction>
implements WindowFunction {
public void apply(String key, GlobalWindow window, Iterable values, 
Collector out);
}

//Apply business logic
public class SgRocProvider implements WindowFunction{
public void apply(String key, GlobalWindow window, Iterable values, 
Collector out);
}

//persist 3 state variables and apply business logic
public class CGMExcursionProviderStream extends RichFlatMapFunction>
implements WindowFunction{
public void apply(String key, GlobalWindow window, Iterable values, 
Collector out);

}

Thanks
Kamaal


> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise  wrote:
> Hi Mohammed,
> 
> something is definitely wrong in your setup. You can safely say that you can 
> process 1k records per second and core with Kafka and light processing, so 
> you shouldn't even need to go distributed in your case.
> 
> Do you perform any heavy computation? What is your flatMap doing? Are you 
> emitting lots of small records from one big record?
> 
> Can you please remove all rebalance and report back? Rebalance is 
> counter-productive if you don't exactly know that you need it.
> 
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal  
>> wrote:
>> Hi Fabian,
>> 
>> Just an update,
>> 
>> Problem 2:-
>> 
>> Caused by: org.apache.kafka.common.errors.NetworkException
>> It is resolved. It was because we exceeded the number of allowed
>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>> unused topics and partitions to resolve the issue.
>> 
>> Problem 1:-
>> 
>> I increased the kafka partition and flink parallelism to 45 and the
>> throughput has improved from 20 minutes to 14 minutes (20K records).
>> Can you check the flink graph and let me know if there is anything
>> else that can be done here to improve the throughput further.
>> 
>> Thanks
>> 
>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>  wrote:
>> >
>> > Hi Fabian,
>> >
>> > Problem 1:-
>> > -
>> > I have removed the print out sink's and ran the test again. This time
>> > the throughput is 17 minutes for 20K records (200 records every
>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>> > and kafka partition of 15)
>> >
>> > Please find the attached application graph. Can you suggest what else
>> > is required further to improve the throughput.
>> >
>> > Problem 2:-
>> > -
>> > Also, I tried to increase the parallelism to 45 from 15 (also
>> > increasing the kafka partition to 45 from 15) to see if this helps in
>> > getting a better throughput.
>> >
>> > After increasing the partition, I am facing the Network issue with
>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>> > with 15 partitions for the kafka topic. This could be an issue with

Re: Stream join with (changing) dimension in Kafka

2021-09-22 Thread Caizhi Weng
Hi!

What type of time attribute is u_ts? If it is an event time attribute then
this query you're running is an event time temporal table join, which will
pause outputting records until the watermark from both inputs has risen
above the row time of that record.

As the dimension table is changing quite slowly, I would recommend you
using the processing time temporal table join (or the so called lookup
table join) instead. See this example from the scala API unit test:

val proctimeOrders: Table = util.addDataStream[(Long, String)](
  "ProctimeOrders", 'o_amount, 'o_currency, 'o_proctime.proctime)
val proctimeRatesHistory: Table = util.addDataStream[(String, Int)](
  "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime)

The documentation of Table API indeed lacks quite a lot of information. I
would recommend you to try out the SQL API instead, which is the super set
of Table API and will more expressive and easier to understand.


John Smith  于2021年9月22日周三 下午4:45写道:

> Hi,
>
> I'm trying to use temporal join in Table API to enrich a stream of
> pageview events with a slowly changing dimension of user information.
> The pageview events are in a kafka topic called *pageviews* and the user
> information are in a kafka topic keyed by *userid* and whenever there is
> an updated user event, it is appended to the *users* topic.
> I declare a table on the pageview topic with watermark strategy of
> *WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a
> table on the users topic with watermark strategy of
> * WatermarkStrategy.forMonotonousTimestamps().*
>
> Here is the code for the temporal join:
>
> Table pv = getPageview(env, tableEnv, properties).
> select(
> $("timestamp").as("pv_ts"),
> $("userid").as("pv_userid"),
> $("pageid").as("pv_pageid")
> );
> Table usr = getUsers(env, tableEnv, properties)
> .select(
> $("timestamp").as("u_ts"),
> $("userid").as("u_userid"),
> $("regionid"),
> $("gender")
> );
>
> TemporalTableFunction userFunction = 
> usr.createTemporalTableFunction($("u_ts"), $("u_userid"));
> tableEnv.createTemporaryFunction("usrFun", userFunction);
>
> Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")), 
> $("pv_userid").isEqual($("u_userid")));
>
> enrichedPV.execute().print();
>
> When I run this, I get result like the following which only is triggered
> when there are new messages pushed into both pageviews and users topics:
>
>
> ++-+++-++++
> | op |   pv_ts |  pv_userid |
>  pv_pageid |u_ts |
> u_userid |   regionid | gender |
>
> ++-+++-++++
> | +I | 2021-09-22 08:28:05.346 | User_8 |
>Page_99 | 2021-09-22 08:28:04.769 |
> User_8 |   Region_1 |  OTHER |
> | +I | 2021-09-22 08:28:12.377 | User_3 |
>Page_88 | 2021-09-22 08:28:08.823 |
> User_3 |   Region_8 | FEMALE |
> | +I | 2021-09-22 08:28:15.385 | User_7 |
>Page_73 | 2021-09-22 08:28:07.817 |
> User_7 |   Region_9 |  OTHER |
> | +I | 2021-09-22 08:28:16.391 | User_7 |
>Page_97 | 2021-09-22 08:28:07.817 |
> User_7 |   Region_9 |  OTHER |
> | +I | 2021-09-22 08:28:17.396 | User_7 |
>Page_43 | 2021-09-22 08:28:07.817 |
> User_7 |   Region_9 |  OTHER |
> | +I | 2021-09-22 08:28:18.400 | User_6 |
>Page_43 | 2021-09-22 08:28:15.854 |
> User_6 |   Region_5 |  OTHER |
>
> However, I want to trigger a result whenever a new pageview message
> arrives and not wait on the user side.
> Do I have any obvious mistake in my code that I cannot get this behavior?
> Also is there any code example that I can try where the main stream is
> enriched when there is a new event regardless of having any new event in
> the dimension side? Flink documentation on temporal join especially for
> TableAPI is really thin!
>
> Thanks in advance.
>
>
>
>
>


Stream join with (changing) dimension in Kafka

2021-09-22 Thread John Smith
Hi,

I'm trying to use temporal join in Table API to enrich a stream of pageview
events with a slowly changing dimension of user information.
The pageview events are in a kafka topic called *pageviews* and the user
information are in a kafka topic keyed by *userid* and whenever there is an
updated user event, it is appended to the *users* topic.
I declare a table on the pageview topic with watermark strategy of
*WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a
table on the users topic with watermark strategy of
* WatermarkStrategy.forMonotonousTimestamps().*

Here is the code for the temporal join:

Table pv = getPageview(env, tableEnv, properties).
select(
$("timestamp").as("pv_ts"),
$("userid").as("pv_userid"),
$("pageid").as("pv_pageid")
);
Table usr = getUsers(env, tableEnv, properties)
.select(
$("timestamp").as("u_ts"),
$("userid").as("u_userid"),
$("regionid"),
$("gender")
);

TemporalTableFunction userFunction =
usr.createTemporalTableFunction($("u_ts"), $("u_userid"));
tableEnv.createTemporaryFunction("usrFun", userFunction);

Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")),
$("pv_userid").isEqual($("u_userid")));

enrichedPV.execute().print();

When I run this, I get result like the following which only is triggered
when there are new messages pushed into both pageviews and users topics:

++-+++-++++
| op |   pv_ts |  pv_userid |
   pv_pageid |u_ts |
u_userid |   regionid | gender |
++-+++-++++
| +I | 2021-09-22 08:28:05.346 | User_8 |
 Page_99 | 2021-09-22 08:28:04.769 |
User_8 |   Region_1 |  OTHER |
| +I | 2021-09-22 08:28:12.377 | User_3 |
 Page_88 | 2021-09-22 08:28:08.823 |
User_3 |   Region_8 | FEMALE |
| +I | 2021-09-22 08:28:15.385 | User_7 |
 Page_73 | 2021-09-22 08:28:07.817 |
User_7 |   Region_9 |  OTHER |
| +I | 2021-09-22 08:28:16.391 | User_7 |
 Page_97 | 2021-09-22 08:28:07.817 |
User_7 |   Region_9 |  OTHER |
| +I | 2021-09-22 08:28:17.396 | User_7 |
 Page_43 | 2021-09-22 08:28:07.817 |
User_7 |   Region_9 |  OTHER |
| +I | 2021-09-22 08:28:18.400 | User_6 |
 Page_43 | 2021-09-22 08:28:15.854 |
User_6 |   Region_5 |  OTHER |

However, I want to trigger a result whenever a new pageview message arrives
and not wait on the user side.
Do I have any obvious mistake in my code that I cannot get this behavior?
Also is there any code example that I can try where the main stream is
enriched when there is a new event regardless of having any new event in
the dimension side? Flink documentation on temporal join especially for
TableAPI is really thin!

Thanks in advance.


Re: S3 access permission error

2021-09-22 Thread Yangze Guo
I'm not an expert on S3. If it is not a credential issue, have you
finish the checklist of this doc[1]?

[1] 
https://aws.amazon.com/premiumsupport/knowledge-center/emr-s3-403-access-denied/?nc1=h_ls

Best,
Yangze Guo

On Wed, Sep 22, 2021 at 3:39 PM Dhiru  wrote:
>
>
> Not sure @yangze ...  but other services which are deployed in same places we 
> are able to access s3 bucket, the link which you share are recommended way, 
> if we have access to s3 then we should not pass credentials ?
>
> On Wednesday, September 22, 2021, 02:59:05 AM EDT, Yangze Guo 
>  wrote:
>
>
> You might need to configure the access credential. [1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> Best,
> Yangze Guo
>
> On Wed, Sep 22, 2021 at 2:17 PM Dhiru  wrote:
> >
> >
> > i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) plugin 
> > is not able to create folder , not sure if I need to change something
> > Whereas when We are trying to pass from the local laptop and passing  aws 
> > credentails its able to create a folder and running as expected
> > On Wednesday, September 22, 2021, 01:39:04 AM EDT, Dhiru 
> >  wrote:
> >
> >
> > flink image I have added both s3 plugin
> > FROM flink:1.11.3-scala_2.12-java11
> > RUN mkdir ./plugins/flink-s3-fs-presto
> > RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar  ./plugins/flink-s3-fs-presto/
> > RUN mkdir ./plugins/flink-s3-fs-hadoop
> > RUN cp ./opt/flink-s3-fs-hadoop-1.11.3.jar  ./plugins/flink-s3-fs-hadoop/
> >
> > some part of flink-conf.yaml  ( I tried with both s3a and s3  )
> ># REQUIRED: set storage location for job metadata in remote storage
> >  state.backend: filesystem
> >  state.backend.fs.checkpointdir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints
> >  state.checkpoints.dir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints
> >  state.savepoints.dir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints
> >  high-availability.storageDir: 
> > s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir
> >  s3.path.style.access: true
> >
> > org.apache.flink.runtime.rest.handler.RestHandlerException: Could not 
> > execute application. at 
> > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
> >  at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> > Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at 
> > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> >  Source) at 
> > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> > Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: 
> > java.util.concurrent.CompletionException: 
> > org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
> > at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> > Source) at 
> > java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> > Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: 
> > Could not execute application. at 
> > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
> >  at 
> > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> >  at 
> > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
> >  ... 7 more Caused by: 
> > org.apache.flink.client.program.ProgramInvocationException: The main method 
> > caused an error: Failed to execute job 'DeduplicationJob'. at 
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >  at 
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >  at 
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at 
> > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
> >  ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to 
> > execute job 'DeduplicationJob'. at 
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1829)
> >  at 
> > 

Re: S3 access permission error

2021-09-22 Thread Dhiru
 
Not sure @yangze ...  but other services which are deployed in same places we 
are able to access s3 bucket, the link which you share are recommended way, if 
we have access to s3 then we should not pass credentials ?
On Wednesday, September 22, 2021, 02:59:05 AM EDT, Yangze Guo 
 wrote:  
 
 You might need to configure the access credential. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials

Best,
Yangze Guo

On Wed, Sep 22, 2021 at 2:17 PM Dhiru  wrote:
>
>
> i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) plugin is 
> not able to create folder , not sure if I need to change something
> Whereas when We are trying to pass from the local laptop and passing  aws 
> credentails its able to create a folder and running as expected
> On Wednesday, September 22, 2021, 01:39:04 AM EDT, Dhiru 
>  wrote:
>
>
> flink image I have added both s3 plugin
> FROM flink:1.11.3-scala_2.12-java11
> RUN mkdir ./plugins/flink-s3-fs-presto
> RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar  ./plugins/flink-s3-fs-presto/
> RUN mkdir ./plugins/flink-s3-fs-hadoop
> RUN cp ./opt/flink-s3-fs-hadoop-1.11.3.jar  ./plugins/flink-s3-fs-hadoop/
>
> some part of flink-conf.yaml  ( I tried with both s3a and s3  )
>    # REQUIRED: set storage location for job metadata in remote storage
>      state.backend: filesystem
>      state.backend.fs.checkpointdir: 
>s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints
>      state.checkpoints.dir: 
>s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints
>      state.savepoints.dir: 
>s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints
>      high-availability.storageDir: 
>s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir
>      s3.path.style.access: true
>
> org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
> application. at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
>  at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
> at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source) at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
> at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: 
> Could not execute application. at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ... 7 more Caused by: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 'DeduplicationJob'. at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to 
> execute job 'DeduplicationJob'. at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1829)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
>  at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
>  at 

Re: S3 access permission error

2021-09-22 Thread Yangze Guo
You might need to configure the access credential. [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials

Best,
Yangze Guo

On Wed, Sep 22, 2021 at 2:17 PM Dhiru  wrote:
>
>
> i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) plugin is 
> not able to create folder , not sure if I need to change something
> Whereas when We are trying to pass from the local laptop and passing  aws 
> credentails its able to create a folder and running as expected
> On Wednesday, September 22, 2021, 01:39:04 AM EDT, Dhiru 
>  wrote:
>
>
> flink image I have added both s3 plugin
> FROM flink:1.11.3-scala_2.12-java11
> RUN mkdir ./plugins/flink-s3-fs-presto
> RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar  ./plugins/flink-s3-fs-presto/
> RUN mkdir ./plugins/flink-s3-fs-hadoop
> RUN cp ./opt/flink-s3-fs-hadoop-1.11.3.jar  ./plugins/flink-s3-fs-hadoop/
>
> some part of flink-conf.yaml   ( I tried with both s3a and s3  )
> # REQUIRED: set storage location for job metadata in remote storage
>  state.backend: filesystem
>  state.backend.fs.checkpointdir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints
>  state.checkpoints.dir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints
>  state.savepoints.dir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints
>  high-availability.storageDir: 
> s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir
>  s3.path.style.access: true
>
> org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
> application. at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
>  at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
> at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source) at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
> at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: 
> Could not execute application. at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
>  at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
>  ... 7 more Caused by: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 'DeduplicationJob'. at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
> at 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
>  ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to 
> execute job 'DeduplicationJob'. at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1829)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
>  at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
>  at io.epiphanous.flinkrunner.flink.BaseFlinkJob.run(BaseFlinkJob.scala:45) 
> at io.epiphanous.flinkrunner.FlinkRunner.process1(FlinkRunner.scala:56) at 
> io.epiphanous.flinkrunner.FlinkRunner.process(FlinkRunner.scala:33) at 
> com.mdsol.flink.delivery_streams.Runner$.run(Runner.scala:25) at 
> 

Re: S3 access permission error

2021-09-22 Thread Dhiru
 
i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) plugin is 
not able to create folder , not sure if I need to change something Whereas when 
We are trying to pass from the local laptop and passing  aws credentails its 
able to create a folder and running as expected  On Wednesday, September 
22, 2021, 01:39:04 AM EDT, Dhiru  wrote:  
 
 flink image I have added both s3 plugin FROM flink:1.11.3-scala_2.12-java11RUN 
mkdir ./plugins/flink-s3-fs-prestoRUN cp ./opt/flink-s3-fs-presto-1.11.3.jar  
./plugins/flink-s3-fs-presto/RUN mkdir ./plugins/flink-s3-fs-hadoopRUN cp 
./opt/flink-s3-fs-hadoop-1.11.3.jar  ./plugins/flink-s3-fs-hadoop/
some part of flink-conf.yaml   ( I tried with both s3a and s3  )    # REQUIRED: 
set storage location for job metadata in remote storage     state.backend: 
filesystem     state.backend.fs.checkpointdir: 
s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints     
state.checkpoints.dir: 
s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints    
 state.savepoints.dir: 
s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints     
high-availability.storageDir: 
s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir     
s3.path.style.access: true
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application. at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
 at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) 
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source) at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: 
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application. at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
Source) at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: 
Could not execute application. at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
 at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
 at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
 ... 7 more Caused by: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job 'DeduplicationJob'. at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
 ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to execute 
job 'DeduplicationJob'. at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1829)
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
 at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
 at io.epiphanous.flinkrunner.flink.BaseFlinkJob.run(BaseFlinkJob.scala:45) at 
io.epiphanous.flinkrunner.FlinkRunner.process1(FlinkRunner.scala:56) at 
io.epiphanous.flinkrunner.FlinkRunner.process(FlinkRunner.scala:33) at 
com.mdsol.flink.delivery_streams.Runner$.run(Runner.scala:25) at 
com.mdsol.flink.delivery_streams.Runner$.main(Runner.scala:7) at 
com.mdsol.flink.delivery_streams.Runner.main(Runner.scala) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
Source) at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown