Re: [SURVEY] How do people upgrade their Flink applications?

2019-10-08 Thread Yang Wang
Just to share some thoughts. If a Flink application need to upgrade, including user jar or Flink version, the following instructions need to be done. 1. Unit test, integration test, stability test, benchmark test 2. Deploy the upgraded application to test environment with fake inputs/outputs to

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-08 Thread Filip Niksic
Here is the solution I currently have. It turned out to be more complicated than I expected. It would be great if a more experienced Flink user could comment and point out the shortcomings. And if you have other ideas for achieving the same thing, let me know! Let's start like in the original

Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Yun Tang
Hi Vishwas This because Flink's checkpoint mechanism could offer you more ability. You could resume from offset within specific checkpoint instead of last committed offset not to mention you could benefit from restoring from last timer state, operator state and keyed state. Best Yun Tang

Re:回复: Re: Flink SQL :Unknown or invalid SQL statement.

2019-10-08 Thread Henry
谢谢啦 在 2019-10-09 11:19:17,"pengchengl...@163.com" 写道: >你好,可以参考https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html > > > >pengchengl...@163.com > >发件人: Henry >发送时间: 2019-10-09 10:44 >收件人: user-zh >主题: Re:Re: Flink SQL :Unknown or invalid SQL statement. >

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-08 Thread Yang Wang
Hi Vijay, If you are using HA solution, i think you do not need to specify the savepoint. Instead the checkpoint is used. The checkpoint is done automatically and periodically based on your configuration.When the jobmanager/taskmanager fails or the whole cluster crashes, it could always recover

回复: Re: Flink SQL :Unknown or invalid SQL statement.

2019-10-08 Thread pengchengl...@163.com
你好,可以参考https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html pengchengl...@163.com 发件人: Henry 发送时间: 2019-10-09 10:44 收件人: user-zh 主题: Re:Re: Flink SQL :Unknown or invalid SQL statement. 哈,非常非常感谢啦。 在 yaml 里定义使用表的哪些内容呢?有木有相关资料呢? 木有搜到呢。谢谢了。 在

Re: [SURVEY] How do people upgrade their Flink applications?

2019-10-08 Thread Konstantinos Kallas
Thank you for the thoughts Yang. If you don't mind, I would be interested to know how you deploy the upgraded application to the pre-production environment. Do you collect historical data and then run the upgraded application using that, or do you duplicate new incoming data on the fly, running

Re: How to write stream data to other Hadoop Cluster by StreamingFileSink

2019-10-08 Thread Yang Wang
Hi Jun Zhang, I think you could add two hdfs cluster configurations in your hdfs-site.xml. The following config keys need to be added. Then you could use both two hdfs clusters in your flink job. dfs.nameservices: mycluster1,mycluster2 dfs.ha.namenodes.mycluster1: nn1,nn2

Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Congxian Qiu
Hi Vishwas Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch. ``` checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs] savepoint --> bin/flink run -s :savepointPath [:runArgs] [1]

Re: Problem with savepoint deserialization

2019-10-08 Thread Congxian Qiu
Hi Steven >From the exception, seems the serializer used before and after the change is incompatible, I'm not very familiar with Scala case class, maybe you can debug it locally, which serializer used before and after the change for the case class. Best, Congxian Steven Nelson 于2019年10月9日周三

Re: flink 1.9

2019-10-08 Thread Steven Nelson
https://flink.apache.org/downloads.html#apache-flink-190 Sent from my iPhone > On Oct 8, 2019, at 3:47 PM, Vishal Santoshi wrote: > > where do I get the corresponding jar for 1.9 ? > > flink-shaded-hadoop2-uber-2.7.5-1.8.0.jar > > Thanks..

flink 1.9

2019-10-08 Thread Vishal Santoshi
where do I get the corresponding jar for 1.9 ? flink-shaded-hadoop2-uber-2.7.5-1.8.0.jar Thanks..

Re: Problem with savepoint deserialization

2019-10-08 Thread Steven Nelson
No problem :) I wasn’t able to find documentation on what can and cannot be upgraded for case classes. I had assumed the same rules that applied to POJO scheme upgrading applied to case classes. Has someone put together rules for case classes? I also should have mentioned we are running 1.9

Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Yun Tang
Hi Vishwas If you did not configure your org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is GROUP_OFFSET by default, which means "Start from committed offsets in ZK / Kafka brokers of a specific consumer group". And you need to enable checkpoint so that kafka offsets are

Re: Problem with savepoint deserialization

2019-10-08 Thread Steven Nelson
Are you sure? I just restarted the job with new new version, but not from a savepoint and took a new savepoint and it seemed to work from there. It just seemed like it couldn’t upgrade the schema during restore. Sent from my iPhone > On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic > wrote:

Re: Problem with savepoint deserialization

2019-10-08 Thread Aleksandar Mastilovic
The Option class is not serializable, if you put something serializable into that case class you wouldn’t have problems. > On Oct 8, 2019, at 8:17 AM, Steven Nelson wrote: > > Hello! We are working with a Scala based pipeline. > > We changed > > case class Record(orgId: Int) > > To > >

Flink restoring a job from a checkpoint

2019-10-08 Thread Vishwas Siravara
Hi guys, I have a flink streaming job which streams from a kafka source. There is no state in the job, just a simple filter , map and write to a kafka sink. Suppose I stop my job and then submit the job again to the cluster with the same consumer group, will the job restore automatically from the

Backpressure tuning/failure

2019-10-08 Thread Owen Rees-Hayward
Hi, I am having a few issues with the Flink (v1.8.1) backpressure default settings, which lead to poor throughput in a comparison I am doing between Storm, Spark and Flink. I have a setup that simulates a progressively worse straggling task that Storm and Spark cope with the relatively well.

Problem with savepoint deserialization

2019-10-08 Thread Steven Nelson
Hello! We are working with a Scala based pipeline. We changed case class Record(orgId: Int) To case class Record(orgId: Int, operationId:Option[String] = None) And now our savepoints fail with this exception: org.apache.flink.util.StateMigrationException: The new state serializer cannot be

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-08 Thread Filip Niksic
Hi Chesnay, Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-08 Thread Chesnay Schepler
In other words, you need a way to partition the stream such that a series of items followed by a barrier are never interrupted. I'm wondering whether you could just apply DataStream#partitionCustom to your source: public static class BarrierPartitionerimplements Partitioner { private int

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-08 Thread Filip Niksic
Hi Yun, The behavior with increased parallelism should be the same as with no parallelism. In other words, for the input from the previous email, the output should always be 1, 3, regardless of parallelism. Operationally, the partial sums maintained in each subtask should somehow be aggregated

Re: Computing two aggregate functions on the same window

2019-10-08 Thread Chesnay Schepler
There doesn't seem to be a built-in way to apply multiple aggregations to a window. You could use an aggregate function that combines other aggregate functions, but admittedly this will get unwieldy as the number of functions increase: public static class MultiAggregateFunction, ACC2, OUT2,

Re: 文件重命名

2019-10-08 Thread XW Marvin
补充下问题 Flink 1.9.0 1.使用StreamingFileSink 消费kafka数据到HDFS 2.开启了EXACTLY_ONCE 3.StreamingFileSink.forBulkFormat,Parquet格式、snappy压缩 写入hdfs的文件都是 part-{parallel-task}-{count} 这种格式 如何重命名啊? marvin.mxw 于2019年10月8日周二 下午6:11写道: > 补充下问题 > > Flink 1.9.0 > > 1. 使用StreamingFileSink 消费kafka数据到HDFS > 2.

Re: Passing parameters to filter function (in DataStreams)

2019-10-08 Thread Chesnay Schepler
You can compute the threshold ahead of time and reference it directly in the filter function. (Below are 2 examples, depending on whether you like lambdas or not) final int threshold =computeThreshold(); temperatureStream.filter(new FilterFunction() { @Override public boolean

Passing parameters to filter function (in DataStreams)

2019-10-08 Thread Komal Mariam
Hi everyone, Suppose I have to compute a filter condition Integer threshold = compute threshold(); If I: temperatureStream.filter(new FilterFunction() { @Override public boolean filter(Integer temperature) throws Exception { Integer threshold = compute threshold(); return temperature >

State & Fault Tolerance in Table API and SQL Implementations

2019-10-08 Thread Vaibhav Singh
Hi, We are looking into a production use case of using Flink, to process multiple streams of data from Kafka topics. We plan to perform joins on these streams and then output aggregations on that data. We plan to use the Table API and SQL capabilities for this. We need to prepare a plan to

Computing two aggregate functions on the same window

2019-10-08 Thread Frank Wilson
Hi, In the datastream api is there a way to take two aggregate functions and apply them to the same window? The output would be a stream of 2-tuples containing the result of each aggregate function. I feel it should be possible to combine previously written functions rather than writing a

RE: Building Flink 1.6.4 fails with "object scala in compiler mirror not found"

2019-10-08 Thread Aikio, Torste
This happens with empty cache. Is it possible that the build itself is generating corrupt jars then? How can I check for that? I think this issue should be reproducible if you try to run the build inside a docker container like I do. /Torste Aikio -Original Message- From: Chesnay

Re: 文件重命名

2019-10-08 Thread Zili Chen
简单看了下拼文件名的规则,你可以试试 RowFormatBuilder#withPartFilePrefix RowFormatBuilder#withPartFileSuffix 这两个方法。应该可以将你的文件名设置成 -- 中间段是写死的。 如果你有更自定义的重命名需求,建议把你为什么需要重命名具体的说一下。 Best, tison. Wesley Peng 于2019年10月8日周二 下午5:43写道: > May you want to rename them in HDFS with FileSystem.rename method? > > > on

Re: 文件重命名

2019-10-08 Thread Wesley Peng
May you want to rename them in HDFS with FileSystem.rename method? on 2019/10/8 17:39, yanggang_it_job wrote: 写入hdfs的文件都是 part-{parallel-task}-{count} 这种格式 如何重命名啊?

文件重命名

2019-10-08 Thread yanggang_it_job
Dear All Flink 1.9.0 1. 使用StreamingFileSink 消费kafka数据到HDFS 2. 开启了EXACTLY_ONCE 写入hdfs的文件都是 part-{parallel-task}-{count} 这种格式 如何重命名啊? Best

Re: Implementing CheckpointableInputFormat

2019-10-08 Thread Chesnay Schepler
You have to use StreamExecutionEnvironment#createFileInput for implementing CheckpointableInputFormat to have any effect. This internally results in it being used by the MonitoringFileSource. If you use StreamExecutionEnvironment#createInput nothing will be checkpointed for the source; and yes

Re: Building Flink 1.6.4 fails with "object scala in compiler mirror not found"

2019-10-08 Thread Chesnay Schepler
The only cause I know for errors such as this are corrupt jars. Try cleaning the maven cache and see if the issue persists. On 07/10/2019 17:36, Aikio, Torste wrote: Hi, I'm trying to build Flink 1.6.4 from source and some of the tests for flink-scala module are failing for me. Are there

flink StreamingFileSink to HDFS 修改文件名字

2019-10-08 Thread XW Marvin
flink 1.9.0 1. 使用StreamingFileSink 消费kafka数据到HDFS 2. 开启了EXACTLY_ONCE 写入hdfs的文件都是 part-{parallel-task}-{count} 这种格式 如何重命名啊?

Re: Group by multiple fields

2019-10-08 Thread Miguel Farrajota
Awesome, thanks! On Tue, Oct 8, 2019 at 5:25 AM Congxian Qiu wrote: > Hi Miguel > > Maybe the doc[1] about how to specifying the keys can help. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html#specifying-keys > Best, > Congxian > > > Miguel Farrajota

Re: kafka offset not working

2019-10-08 Thread Benjamin Cuthbert
So looks like checkpoints are not working, each time a job is stopped and started it is a different checkpoint. Using savepoints manually seems to work if you do a call to save the savepoint and then run it with -s savepoint location. Why does checkpoints not work? On Mon, Oct 7, 2019 at 10:25 PM

Re: [QUESTION] How to parallelize with explicit punctuation in Flink?

2019-10-08 Thread Yun Gao
Hi Filip, I have one question on the problem: what is the expected behavior when the parallelism of the FlatMapFunction is increased to more than 1? Should each subtask maintains the partial sum of all values received, and whenever the barrier is received, then it just

Re: Building Flink 1.6.4 fails with "object scala in compiler mirror not found"

2019-10-08 Thread Congxian Qiu
Hi if you just want to skip the test, do you try to add `-DskipTests` when executing maven command. Best, Congxian Aikio, Torste 于2019年10月7日周一 下午11:36写道: > Hi, > > I'm trying to build Flink 1.6.4 from source and some of the tests for > flink-scala module are failing for me. Are there some

Re: Flink 1.8 版本如何进行 TaskManager 的资源控制

2019-10-08 Thread Xintong Song
逸尘你好, 像你说的这种情况,可以先根据作业的并发数算出总共需要多少个slot,然后通过配置每个TM的slot数和资源大小来控制总资源开销。相关配置如下: - 单个TM的vcore数:yarn.containers.vcores - 单个TM的内存大小:taskmanager.heap.size - 单个TM中的slot数量:taskmanager.numberOfTaskSlots 上述配置可以在flink-conf.yaml配置文件中修改,也可以作为动态配置添加在 flink run 命令的末尾,在yarn上提交时动态配置的格式为 '-yD =' 。