Just to share some thoughts.
If a Flink application need to upgrade, including user jar or Flink
version, the following instructions
need to be done.
1. Unit test, integration test, stability test, benchmark test
2. Deploy the upgraded application to test environment with fake
inputs/outputs to
Here is the solution I currently have. It turned out to be more complicated
than I expected. It would be great if a more experienced Flink user could
comment and point out the shortcomings. And if you have other ideas for
achieving the same thing, let me know!
Let's start like in the original
Hi Vishwas
This because Flink's checkpoint mechanism could offer you more ability. You
could resume from offset within specific checkpoint instead of last committed
offset not to mention you could benefit from restoring from last timer state,
operator state and keyed state.
Best
Yun Tang
谢谢啦
在 2019-10-09 11:19:17,"pengchengl...@163.com" 写道:
>你好,可以参考https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html
>
>
>
>pengchengl...@163.com
>
>发件人: Henry
>发送时间: 2019-10-09 10:44
>收件人: user-zh
>主题: Re:Re: Flink SQL :Unknown or invalid SQL statement.
>
Hi Vijay,
If you are using HA solution, i think you do not need to specify the
savepoint. Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your
configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always
recover
你好,可以参考https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html
pengchengl...@163.com
发件人: Henry
发送时间: 2019-10-09 10:44
收件人: user-zh
主题: Re:Re: Flink SQL :Unknown or invalid SQL statement.
哈,非常非常感谢啦。 在 yaml 里定义使用表的哪些内容呢?有木有相关资料呢? 木有搜到呢。谢谢了。
在
Thank you for the thoughts Yang. If you don't mind, I would be interested to
know how you deploy the upgraded application to the pre-production environment.
Do you collect historical data and then run the upgraded application using
that, or do you duplicate new incoming data on the fly, running
Hi Jun Zhang,
I think you could add two hdfs cluster configurations in your hdfs-site.xml.
The following config keys need to be added. Then you could use both two
hdfs clusters in your flink job.
dfs.nameservices: mycluster1,mycluster2
dfs.ha.namenodes.mycluster1: nn1,nn2
Hi Vishwas
Currently, Flink can only restore retained checkpoint or savepoint with
parameter `-s`[1][2], otherwise, it will start from scratch.
```
checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]
[1]
Hi Steven
>From the exception, seems the serializer used before and after the change
is incompatible, I'm not very familiar with Scala case class, maybe you can
debug it locally, which serializer used before and after the change for the
case class.
Best,
Congxian
Steven Nelson 于2019年10月9日周三
https://flink.apache.org/downloads.html#apache-flink-190
Sent from my iPhone
> On Oct 8, 2019, at 3:47 PM, Vishal Santoshi wrote:
>
> where do I get the corresponding jar for 1.9 ?
>
> flink-shaded-hadoop2-uber-2.7.5-1.8.0.jar
>
> Thanks..
where do I get the corresponding jar for 1.9 ?
flink-shaded-hadoop2-uber-2.7.5-1.8.0.jar
Thanks..
No problem :)
I wasn’t able to find documentation on what can and cannot be upgraded for case
classes. I had assumed the same rules that applied to POJO scheme upgrading
applied to case classes. Has someone put together rules for case classes? I
also should have mentioned we are running 1.9
Hi Vishwas
If you did not configure your
org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is
GROUP_OFFSET by default, which means "Start from committed offsets in ZK /
Kafka brokers of a specific consumer group". And you need to enable checkpoint
so that kafka offsets are
Are you sure? I just restarted the job with new new version, but not from a
savepoint and took a new savepoint and it seemed to work from there. It just
seemed like it couldn’t upgrade the schema during restore.
Sent from my iPhone
> On Oct 8, 2019, at 1:22 PM, Aleksandar Mastilovic
> wrote:
The Option class is not serializable, if you put something serializable into
that case class you wouldn’t have problems.
> On Oct 8, 2019, at 8:17 AM, Steven Nelson wrote:
>
> Hello! We are working with a Scala based pipeline.
>
> We changed
>
> case class Record(orgId: Int)
>
> To
>
>
Hi guys,
I have a flink streaming job which streams from a kafka source. There is no
state in the job, just a simple filter , map and write to a kafka sink.
Suppose I stop my job and then submit the job again to the cluster with the
same consumer group, will the job restore automatically from the
Hi,
I am having a few issues with the Flink (v1.8.1) backpressure default
settings, which lead to poor throughput in a comparison I am doing between
Storm, Spark and Flink.
I have a setup that simulates a progressively worse straggling task that
Storm and Spark cope with the relatively well.
Hello! We are working with a Scala based pipeline.
We changed
case class Record(orgId: Int)
To
case class Record(orgId: Int, operationId:Option[String] = None)
And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer
cannot be
Hi Chesnay,
Thanks for the reply. While your solution ultimately does use multiple
partitions, from what I can tell the underlying processing is still
sequential. Imagine a stream where barriers are quite rare, say a million
values is followed by a barrier. Then these million values all end up at
In other words, you need a way to partition the stream such that a
series of items followed by a barrier are never interrupted.
I'm wondering whether you could just apply DataStream#partitionCustom to
your source:
public static class BarrierPartitionerimplements Partitioner {
private int
Hi Yun,
The behavior with increased parallelism should be the same as with no
parallelism. In other words, for the input from the previous email, the
output should always be 1, 3, regardless of parallelism. Operationally, the
partial sums maintained in each subtask should somehow be aggregated
There doesn't seem to be a built-in way to apply multiple aggregations
to a window.
You could use an aggregate function that combines other aggregate
functions, but admittedly this will get unwieldy as the number of
functions increase:
public static class MultiAggregateFunction, ACC2, OUT2,
补充下问题
Flink 1.9.0
1.使用StreamingFileSink 消费kafka数据到HDFS
2.开启了EXACTLY_ONCE
3.StreamingFileSink.forBulkFormat,Parquet格式、snappy压缩
写入hdfs的文件都是
part-{parallel-task}-{count}
这种格式
如何重命名啊?
marvin.mxw 于2019年10月8日周二 下午6:11写道:
> 补充下问题
>
> Flink 1.9.0
>
> 1. 使用StreamingFileSink 消费kafka数据到HDFS
> 2.
You can compute the threshold ahead of time and reference it directly in
the filter function.
(Below are 2 examples, depending on whether you like lambdas or not)
final int threshold =computeThreshold(); temperatureStream.filter(new
FilterFunction() {
@Override public boolean
Hi everyone,
Suppose I have to compute a filter condition
Integer threshold = compute threshold();
If I:
temperatureStream.filter(new FilterFunction() {
@Override
public boolean filter(Integer temperature) throws Exception {
Integer threshold = compute threshold();
return temperature >
Hi,
We are looking into a production use case of using Flink, to process
multiple streams of data from Kafka topics.
We plan to perform joins on these streams and then output aggregations on
that data. We plan to use the Table API and SQL capabilities for this.
We need to prepare a plan to
Hi,
In the datastream api is there a way to take two aggregate functions and
apply them to the same window? The output would be a stream of 2-tuples
containing the result of each aggregate function.
I feel it should be possible to combine previously written functions rather
than writing a
This happens with empty cache. Is it possible that the build itself is
generating corrupt jars then? How can I check for that?
I think this issue should be reproducible if you try to run the build inside a
docker container like I do.
/Torste Aikio
-Original Message-
From: Chesnay
简单看了下拼文件名的规则,你可以试试
RowFormatBuilder#withPartFilePrefix
RowFormatBuilder#withPartFileSuffix
这两个方法。应该可以将你的文件名设置成
--
中间段是写死的。
如果你有更自定义的重命名需求,建议把你为什么需要重命名具体的说一下。
Best,
tison.
Wesley Peng 于2019年10月8日周二 下午5:43写道:
> May you want to rename them in HDFS with FileSystem.rename method?
>
>
> on
May you want to rename them in HDFS with FileSystem.rename method?
on 2019/10/8 17:39, yanggang_it_job wrote:
写入hdfs的文件都是
part-{parallel-task}-{count}
这种格式
如何重命名啊?
Dear All
Flink 1.9.0
1. 使用StreamingFileSink 消费kafka数据到HDFS
2. 开启了EXACTLY_ONCE
写入hdfs的文件都是
part-{parallel-task}-{count}
这种格式
如何重命名啊?
Best
You have to use StreamExecutionEnvironment#createFileInput for
implementing CheckpointableInputFormat to have any effect. This
internally results in it being used by the MonitoringFileSource.
If you use StreamExecutionEnvironment#createInput nothing will be
checkpointed for the source; and yes
The only cause I know for errors such as this are corrupt jars. Try
cleaning the maven cache and see if the issue persists.
On 07/10/2019 17:36, Aikio, Torste wrote:
Hi,
I'm trying to build Flink 1.6.4 from source and some of the tests for
flink-scala module are failing for me. Are there
flink 1.9.0
1. 使用StreamingFileSink 消费kafka数据到HDFS
2. 开启了EXACTLY_ONCE
写入hdfs的文件都是
part-{parallel-task}-{count}
这种格式
如何重命名啊?
Awesome, thanks!
On Tue, Oct 8, 2019 at 5:25 AM Congxian Qiu wrote:
> Hi Miguel
>
> Maybe the doc[1] about how to specifying the keys can help.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/api_concepts.html#specifying-keys
> Best,
> Congxian
>
>
> Miguel Farrajota
So looks like checkpoints are not working, each time a job is stopped and
started it is a different checkpoint. Using savepoints manually seems to
work if you do a call to save the savepoint and then run it with -s
savepoint location. Why does checkpoints not work?
On Mon, Oct 7, 2019 at 10:25 PM
Hi Filip,
I have one question on the problem: what is the expected behavior
when the parallelism of the FlatMapFunction is increased to more than 1?
Should each subtask maintains the partial sum of all values received, and
whenever the barrier is received, then it just
Hi
if you just want to skip the test, do you try to add `-DskipTests` when
executing maven command.
Best,
Congxian
Aikio, Torste 于2019年10月7日周一 下午11:36写道:
> Hi,
>
> I'm trying to build Flink 1.6.4 from source and some of the tests for
> flink-scala module are failing for me. Are there some
逸尘你好,
像你说的这种情况,可以先根据作业的并发数算出总共需要多少个slot,然后通过配置每个TM的slot数和资源大小来控制总资源开销。相关配置如下:
- 单个TM的vcore数:yarn.containers.vcores
- 单个TM的内存大小:taskmanager.heap.size
- 单个TM中的slot数量:taskmanager.numberOfTaskSlots
上述配置可以在flink-conf.yaml配置文件中修改,也可以作为动态配置添加在 flink run
命令的末尾,在yarn上提交时动态配置的格式为 '-yD =' 。
40 matches
Mail list logo