Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-17 Thread Yangze Guo
I second Thomas that we can support both Java 8 and 11. Best, Yangze Guo On Wed, Mar 18, 2020 at 12:12 PM Thomas Weise wrote: > > --> > > On Mon, Mar 16, 2020 at 1:58 AM Andrey Zagrebin wrote: >> >> Thanks for the further feedback Thomas and Yangze. >> >> > A generic, dynamic configuration

Re: state schema evolution for case classes

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi Apoorv, Flink currently does not natively support schema evolution for state types using Scala case classes [1]. So, as Roman has pointed out, there are 2 possible ways for you to do that: - Implementing a custom serializer that support schema evolution for your specific Scala case classes,

Re: state schema evolution for case classes

2020-03-17 Thread Apoorv Upadhyay
Thanks a lot , Also can you share one example where these has been implemented? I have gone through docs does not happen to work still On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi Apoorv, > > You can achieve this by implementing custom serializers

Flink Weekly | 每周社区动态更新 - 2020/03/18

2020-03-17 Thread LakeShen
大家好,本文为 Flink Weekly 的第九期,由沈磊(LakeShen)整理,主要内容包括:近期社区开发进展,邮件问题答疑以及 Flink 中文社区相关技术博客的分享。 社区开发进展 [Table API & SQL] Jingsong Li 发起 FLIP-115 的讨论,主要在 Flink Table 支持 FileSystem Connector,FLIP-115 主要内容包括: 1. 在 Flink Table 中支持 FileSystem Table Factory,同时支持csv/parquet/orc/json/avro 格式。 2. 支持在流应用或者 Flink

Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

2020-03-17 Thread Bashar Abdul-Jawad
StateAssignmentOperation.checkStateMappingCompleteness doesn't check for UserDefinedOperatorIDs (specified using setUidHash), causing the exception:

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-17 Thread Thomas Weise
--> On Mon, Mar 16, 2020 at 1:58 AM Andrey Zagrebin wrote: > Thanks for the further feedback Thomas and Yangze. > > > A generic, dynamic configuration mechanism based on environment variables > is essential and it is already supported via envsubst and an environment > variable that can supply a

Re: Streaming File Sink的使用问题

2020-03-17 Thread Yun Gao
从报错来看,GenericRecord可能不能被序列化;感觉目前可以先用一个自定义的数据类型来传输 -- From:58683632 <58683...@qq.com> Send Time:2020 Mar. 17 (Tue.) 13:33 To:user-zh Subject:Streaming File Sink的使用问题 Streaming File Sink使用parquet avro格式进行bulk write,代码如下:final

Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-17 Thread tison
Sorry I mixed up the log, it belongs to previous failure. Could you trying to reproduce the problem with DEBUG level log? >From the log we knew that JM & RM had been elected as leader but the listener didn't work. However, we didn't know it is because the leader didn't publish the leader info or

Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
I agree, this is really confusing for users. Do you mind to create a ticket for that ? Craig Foster 于2020年3月18日周三 上午8:36写道: > If I specify these options, it seems to work...but I thought I could > have this dynamically determined when submitting jobs just using the > "yarn" option: > >

Re: Flink SQL, How can i set parallelism in clause of group by ?

2020-03-17 Thread Benchao Li
Hi forideal, Currently, there is no way to change an operator's parallelism for SQL. All operators use global parallelism as their parallelism, except for some 'global operator' which can only has parallelism of 1. BTW, does changing the parallelism of all operators meets your need? forideal

Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Craig Foster
Yeah, I was wondering about that. I'm using `/usr/lib/flink/bin/start-scala-shell.sh yarn`-- previously I'd use `/usr/lib/flink/bin/start-scala-shell.sh yarn -n ${NUM}` but that deprecated option was removed. On Tue, Mar 17, 2020 at 4:11 PM Jeff Zhang wrote: > > It looks like you are running

Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
It looks like you are running under standalone mode, what is your command to start scala shell. ? Craig Foster 于2020年3月18日周三 上午5:23写道: > Hi: > When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute > programs at the Scala shell. > > It gives me an error that the REST address must be

Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-17 Thread Pankaj Chand
Thank you, Yang and Xintong! Best, Pankaj On Mon, Mar 16, 2020, 9:27 PM Yang Wang wrote: > Hi Pankaj, > > Just like Xintong has said, the biggest difference of Flink on Kubernetes > and native > integration is dynamic resource allocation. Since the latter has en > embedded K8s > client and

Custom Exception Handling

2020-03-17 Thread Anil Alfons K
Hi Community, I am reading data from Kafka. The FlinkKafkaConsumer reads data from it. Then some application-specific logic in a process function. If I receive any invalid data I throw a custom exception and it's handled in the process function itself. This invalid data is taken out as side

Flink SQL, How can i set parallelism in clause of group by ?

2020-03-17 Thread forideal
Hello everyone I'm a Flink SQL user. Now i have a question.How can i set parallelism in clause of group by. For example SELECT T.user_id, D.user_name (SELECT user_id, MIN(processtime) from my_table group by

Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Gordon, That sounds good. My first thought was that if I have to break up the logic I'd end up with: BroadcastFunction1 --> AsyncFunction --> BroadcastFunction2 ...with Broadcast1 & BroadcastFunction2 needing the same broadcast state, and that state could change while an item is being

Re: Help me understand this Exception

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi, The exception stack you posted simply means that the next operator in the chain failed to process the output watermark. There should be another exception, which would explain why some operator was closed / failed and eventually leading to the above exception. That would provide more insight

Help me understand this Exception

2020-03-17 Thread aj
Hi, I am running a streaming job with generating watermark like this : public static class SessionAssigner implements AssignerWithPunctuatedWatermarks { @Override public long extractTimestamp(GenericRecord record, long previousElementTimestamp) { long timestamp = (long)

Re: Hadoop user jar for flink 1.9 plus

2020-03-17 Thread Chesnay Schepler
You can download flink-shaded-hadoop from the downloads page: https://flink.apache.org/downloads.html#additional-components On 17/03/2020 15:56, Vishal Santoshi wrote: We have been on flink 1.8.x on production and were planning to go to flink 1.9 or above. We have always used hadoop uber jar

Re: Communication between two queries

2020-03-17 Thread Mikael Gordani
No worries and great idea! I will play around with it and see what I manage to do. Cheers! Den tis 17 mars 2020 kl 15:59 skrev Piotr Nowojski : > Ops, sorry there was a misleading typo/auto correction in my previous > e-mail. Second sentence should have been: > > > First of all you would have to

Re: Flink YARN app terminated before the client receives the result

2020-03-17 Thread tison
JIRA created as https://jira.apache.org/jira/browse/FLINK-16637 Best, tison. Till Rohrmann 于2020年3月17日周二 下午5:57写道: > @Tison could you create an issue to track the problem. Please also link > the uploaded log file for further debugging. > > I think the reason why it worked in Flink 1.9 could

Re: Communication between two queries

2020-03-17 Thread Piotr Nowojski
Ops, sorry there was a misleading typo/auto correction in my previous e-mail. Second sentence should have been: > First of all you would have to use event time semantic for consistent results Piotrek > On 17 Mar 2020, at 14:43, Piotr Nowojski wrote: > > Hi, > > Yes, you are looking in the

Hadoop user jar for flink 1.9 plus

2020-03-17 Thread Vishal Santoshi
We have been on flink 1.8.x on production and were planning to go to flink 1.9 or above. We have always used hadoop uber jar from https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2-uber but it seems they go up to 1.8.3 and their distribution ends 2019. How do or where do we

Re: Apache Airflow - Question about checkpointing and re-run a job

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi, I believe that the title of this email thread was a typo, and should be "Apache Flink - Question about checkpointing and re-run a job." I assume this because the contents of the previous conversations seem to be purely about Flink. Otherwise, as far as I know, there doesn't seem to be any

Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi, As David already explained, they are similar in that you may output zero to multiple records for both process and flatMap functions. However, ProcessFunctions also expose to the user much more powerful functionality, such as registering timers, outputting to side outputs, etc. Cheers,

Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi John, Have you considered letting the BroadcastProcessFunction output events that indicate extra external HTTP requests needs to be performed, and have them consumed by a downstream async IO operator to complete the HTTP request? That could work depending on what exactly you need to do in your

Re: Communication between two queries

2020-03-17 Thread Piotr Nowojski
Hi, Yes, you are looking in the right directions with the watermarks. First of all you would have to use event time semantic for constant results. With processing time everything would be simpler, but it would be more difficult to reason about the results (your choice). Secondly, you would

Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread kant kodali
Got it! and thanks a lot for that. So there is no difference between flatmap and process then? On Tue, Mar 17, 2020 at 5:29 AM David Anderson wrote: > Map applies a MapFunction (or a RichMapFunction) to a DataStream and does > a one-to-one transformation of the stream elements. > > Process

Re: Issues with Watermark generation after join

2020-03-17 Thread Dominik Wosiński
Hey sure, the original Temporal Table SQL is: |SELECT e.*, f.level as level FROM | enablers AS e, | LATERAL TABLE (Detectors(e.timestamp)) AS f | WHERE e.id= f.id |"" And the previous SQL query to join A is something like : SELECT * | FROM A te, | B s | WHERE s.id = te.id AND s.level = te.level

Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread David Anderson
Map applies a MapFunction (or a RichMapFunction) to a DataStream and does a one-to-one transformation of the stream elements. Process applies a ProcessFunction, which can produce zero, one, or many events in response to each event. And when used on a keyed stream, a KeyedProcessFunction can use

Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-17 Thread LakeShen
Thank you, I will do that. jinhai wang 于2020年3月17日周二 下午5:58写道: > Hi LakeShen > > You also must assign IDs to all operators of an application. Otherwise, > you may not be able to recover from checkpoint > > Doc: >

Re: Apache Airflow - Question about checkpointing and re-run a job

2020-03-17 Thread kant kodali
Does Airflow has a Flink Operator? I am not seeing it? Can you please point me? On Mon, Nov 18, 2019 at 3:10 AM M Singh wrote: > Thanks Congxian for your answer and reference. Mans > > On Sunday, November 17, 2019, 08:59:16 PM EST, Congxian Qiu < > qcx978132...@gmail.com> wrote: > > > Hi >

RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Flink Users, I have a BroadcastProcessFunction and in the processElement method I sometimes need to do some http requests, depending on the broadcast state. Because I'm doing http requests, I'd prefer the function to be async, like RichAsyncFunction.asyncInvoke(), but RichAsyncFunction

what is the difference between map vs process on a datastream?

2020-03-17 Thread kant kodali
what is the difference between map vs process on a datastream? they look very similar. Thanks!

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-17 Thread Becket Qin
Actually it might be better to create another ticket, FLINK-8093 was mainly complaining about the JMX bean collision when there are multiple tasks running in the same TM. Jiangjie (Becket) Qin On Tue, Mar 17, 2020 at 6:33 PM Becket Qin wrote: > Hi Till, > > It looks FLINK-8093

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-17 Thread Becket Qin
Hi Till, It looks FLINK-8093 reports the same issue, although the reported information is not exactly correct, as this should not cause the producer to fail. I'll take care of the ticket. Thanks, Jiangjie (Becket) Qin On Tue, Mar 17, 2020 at

Re: AfterMatchSkipStrategy for timed out patterns

2020-03-17 Thread Till Rohrmann
Hi Dominik, at the moment the AfterMatchSkipStrategy are only applied to fully matching sequences. In case of timeouts, the AfterMatchSkipStrategy will be ignored because technically it is not a match. Cheers, Till On Mon, Mar 16, 2020 at 5:39 PM Dominik Wosiński wrote: > > Hey all, > > I was

Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-17 Thread Till Rohrmann
I agree with Piotr that we need some type of checkpoint which supports rescaling. Otherwise, the reactive mode and auto-scaling will only work if the system has taken a savepoint which by definition should only be done by the user. Cheers, Till On Mon, Mar 16, 2020 at 8:39 AM Piotr Nowojski

Re: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-17 Thread Till Rohrmann
Thanks for reporting this issue Brian. I'm not a Table API expert but I know that there is some work on the type system ongoing. I've pulled Timo and Jingsong into the conversation who might be able to tell you what exactly changed and whether the timestamp issue might be caused by the changes.

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-17 Thread Till Rohrmann
@Becket do we already have a JIRA ticket to track this effort? Cheers, Till On Mon, Mar 16, 2020 at 4:07 AM Becket Qin wrote: > Hi Sidney, > > The WARN logging you saw was from the AbstractPartitionDiscoverer which is > created by FlinkKafkaConsumer itself. It has an internal consumer which >

Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-17 Thread jinhai wang
Hi LakeShen You also must assign IDs to all operators of an application. Otherwise, you may not be able to recover from checkpoint Doc: https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state

Re: Flink YARN app terminated before the client receives the result

2020-03-17 Thread Till Rohrmann
@Tison could you create an issue to track the problem. Please also link the uploaded log file for further debugging. I think the reason why it worked in Flink 1.9 could have been that we had a async callback in the longer chain which broke the flow of execution and allowed to send the response.

Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-17 Thread Till Rohrmann
Let us know if you should run into any problems. The state processor API is still young and could benefit from as much feedback as possible. Cheers, Till On Tue, Mar 17, 2020 at 2:57 AM LakeShen wrote: > Wow,this feature is look so good,I will see this feature. > Thank you to reply me , Till.

Re: Very large _metadata file

2020-03-17 Thread Till Rohrmann
Did I understand you correctly that you use the union state to synchronize the per partition state across all operators in order to obtain a global overview? If this is the case, then this will only work in case of a failover. Only then, all operators are being restarted with the union of all

Re: How do I get the outPoolUsage value inside my own stream operator?

2020-03-17 Thread Felipe Gutierrez
Hi, just for the record. I am collecting the values of outPoolUsage using the piece of code below inside my stream operator OperatorMetricGroup operatorMetricGroup = (OperatorMetricGroup) this.getMetricGroup(); TaskMetricGroup taskMetricGroup = operatorMetricGroup.parent(); MetricGroup

Re: Communication between two queries

2020-03-17 Thread Mikael Gordani
Hi Piotr! Continuing with my scenario, since both of the queries will share the same sink, I've realized that some issues will appear when I switch queries. Especially with regards to stateful operators, e.g aggregation. Let me provide an example: So, let say that both of the queries ingest a

Re:Re: 使用Flink sql insert 数据 to hive 之乱码问题

2020-03-17 Thread 吕先生
Hi, 在Hive 和 Flink 中执行 select * from temp_h1 均正常。在Flink sql中,查询异常:(将temp_h2 在hdfs 上的文件,又以附件的形式上传了一次) 在 2020-03-17 17:05:21,"Jingsong Li" 写道: >Hi, > >- SinkConversionToRow是Flink内部的数据结构转化结果,和结果正确性应该无关,具体看sink的。 >- 似乎只有log一个附件,没看到乱码文件。 >- 在Flink中试下“select * from temp_h1”看下结果? >-

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-17 Thread Yang Wang
Hi Andrey, Thanks for your explanation. > About the logging What i mean is we could not forward the stdout/stderr to local files and docker stdout at the same time by using log4j. For the jobmanager.log/taskmanager.log, it works quite well since we only need to add a console appender in the

Re: 使用Flink sql insert 数据 to hive 之乱码问题

2020-03-17 Thread Jingsong Li
Hi, - SinkConversionToRow是Flink内部的数据结构转化结果,和结果正确性应该无关,具体看sink的。 - 似乎只有log一个附件,没看到乱码文件。 - 在Flink中试下“select * from temp_h1”看下结果? - 在Hive中试下“select * from temp_h1”看下结果? - 在Hive中试下“select * from temp_h2”看下结果? Best, Jingsong Lee On Tue, Mar 17, 2020 at 4:25 PM 吕先生 wrote: > 各位大佬,大家好! > >

使用Flink sql insert 数据 to hive 之乱码问题

2020-03-17 Thread 吕先生
各位大佬,大家好! 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink SQL gateway 0.1 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Andrey Zagrebin
Hi Lake, When the Flink doc mentions a state entry in RocksDB, we mean one key/value pair stored by user code over any keyed state API (keyed context in keyed operators obtained e.g. from keyBy() transformation). In case of map or list, the doc means map key/value and list element. -

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Andrey Zagrebin
Hi Lake, When the Flink doc mentions a state entry in RocksDB, we mean one key/value pair stored by user code over any keyed state API (keyed context in keyed operators obtained e.g. from keyBy() transformation). In case of map or list, the doc means map key/value and list element. -

Re: 读取ORC文件的VectorizedRowBatch的最佳batchSize设置建议

2020-03-17 Thread Jingsong Li
Hi, 1.10没有convert成Row,只是提供一个row的view;之前是convert成Row,这个差别对性能影响很大。 Best, Jingsong Lee On Tue, Mar 17, 2020 at 3:31 PM jun su wrote: > hi Jingsong Li, > > 感谢回复,理解了你的意思. > 这个问题是我在看flink-1.10有关orc的代码时发现的 , 我注意到flink-1.10的release notes中有提到: > 向量化读取ORC. 但是我对比老版本的代码, 一直是采用VectorizedRowBatch的方式, >

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Yun Tang
Hi Lake Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL entries (by comparing with current timestamp provided from RocksDB's time_provider) to not let them stay in newly compacted data. This would iterator over data entries with FlinkCompactionFilter::FilterV2

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Yun Tang
Hi Lake Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL entries (by comparing with current timestamp provided from RocksDB's time_provider) to not let them stay in newly compacted data. This would iterator over data entries with FlinkCompactionFilter::FilterV2

Re: Flink YARN app terminated before the client receives the result

2020-03-17 Thread DONG, Weike
Hi Tison & Till and all, I have uploaded the client, taskmanager and jobmanager log to Gist ( https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I can reproduce this bug every time when trying to cancel Flink 1.10 jobs on YARN. Besides, in earlier Flink versions like 1.9,

Re: 读取ORC文件的VectorizedRowBatch的最佳batchSize设置建议

2020-03-17 Thread jun su
hi Jingsong Li, 感谢回复,理解了你的意思. 这个问题是我在看flink-1.10有关orc的代码时发现的 , 我注意到flink-1.10的release notes中有提到: 向量化读取ORC. 但是我对比老版本的代码, 一直是采用VectorizedRowBatch的方式, flink-1.10只是对不同版本的hive做了适配, 我也看到有关代码也是你的pull request, 不知道是否是这样? Jingsong Li 于2020年3月17日周二 下午12:04写道: > Hi, > >

Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread LakeShen
Hi community , I see the flink RocksDBStateBackend state cleanup,now the code like this : StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupInRocksdbCompactFilter(1000) .build(); > The default background cleanup for RocksDB backend queries the current

Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread LakeShen
Hi community , I see the flink RocksDBStateBackend state cleanup,now the code like this : StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupInRocksdbCompactFilter(1000) .build(); > The default background cleanup for RocksDB backend queries the current