Re: Making job fail on Checkpoint Expired?

2020-04-07 Thread Congxian Qiu
Hi Robin
Thanks for the detailed reply, and sorry for my late reply.
I think that your request to fail the whole job when continues checkpoint
expired is valid, I've created an issue to track this[1]

For now, maybe the following steps can help you find out the reason of time
out

1. You can find out the "not ack subtask" in checkpoint ui, (maybe it
called A)
2. find out A is under backpressure now?
2.1. if A is under backpressure, please fix it
2.2 if A is not under backpressure, you can go to the tm log of A to find
out something abnormal(maybe you need to enable the debug log in this step)

for the snapshot in TM side, it contains 1) barrier align (exactly-once
mode, at least once no need to align the barrier); 2) synchronize
procedure; 3)asynchronize procedure;

backpressure will affect step 1, too many timers/cpu consumption too
high/disk utilization too high may affect step 2; 3) disk
performance/network performance may affect step 3;

[1] https://issues.apache.org/jira/browse/FLINK-17043
Best,
Congxian


Robin Cassan  于2020年4月3日周五 下午8:35写道:

> Hi Congxian,
>
> Thanks for confirming! The reason I want this behavior is because we are
> currently investigating issues with checkpoints that keep getting timeouts
> after the job has been running for a few hours. We observed that, after a
> few timeouts, if the job was being restarted because of a lost TM for
> example, the next checkpoints would be working for a few more hours.
> However, if the job continues running and consuming more data, the next
> checkpoints will be even bigger and the chances of them completing in time
> are getting even thinner.
> Crashing the job is not a viable solution I agree, but it would allow us
> to generate data during the time we investigate the root cause of the
> timeouts.
>
> I believe that having the option to make the job restart after a few
> checkpoint timeouts would still help to avoid the snowball effect of
> incremental checkpoints being bigger and bigger if the checkpoints keep
> getting expired.
>
> I'd love to get your opinion on this!
>
> Thanks,
> Robin
>
> Le ven. 3 avr. 2020 à 11:17, Congxian Qiu  a
> écrit :
>
>> Currently, only checkpoint declined will be counted into
>> `continuousFailureCounter`.
>> Could you please share why do you want the job to fail when checkpoint
>> expired?
>>
>> Best,
>> Congxian
>>
>>
>> Timo Walther  于2020年4月2日周四 下午11:23写道:
>>
>>> Hi Robin,
>>>
>>> this is a very good observation and maybe even unintended behavior.
>>> Maybe Arvid in CC is more familiar with the checkpointing?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 02.04.20 15:37, Robin Cassan wrote:
>>> > Hi all,
>>> >
>>> > I am wondering if there is a way to make a flink job fail (not cancel
>>> > it) when one or several checkpoints have failed due to being expired
>>> > (taking longer than the timeout) ?
>>> > I am using Flink 1.9.2 and have set
>>> > `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the trick.
>>> > Looking into the CheckpointFailureManager.java class, it looks like
>>> this
>>> > only works when the checkpoint failure reason is
>>> > `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented
>>> on
>>> > `*CHECKPOINT_EXPIRED*`.
>>> > Am I missing something?
>>> >
>>> > Thanks!
>>>
>>>


Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

2020-04-07 Thread Yun Tang
HI Salva

Sorry for missing your recent reply.
If you just want to make the models could be recoverable, you should choose 
operator state to store the "models". If you stick to the keyed state, I cannot 
see why these models are related to current processing key. As you can see, the 
"models" is just a HashMap[(String, String), Model], and I don't know why we 
need to couple all models to just one specific key.

Best
Yun Tang

From: Salva Alcántara 
Sent: Sunday, April 5, 2020 20:22
To: user@flink.apache.org 
Subject: Re: Using MapState clear, put methods in snapshotState within 
KeyedCoProcessFunction, valid or not?

Hi Yun,

In the end, I left the code like this

```
override def snapshotState(context: FunctionSnapshotContext): Unit = {
for ((k, model) <- models) {
  modelsBytes.put(k, model.toBytes(v))
}
}
```

I have verified with a simple test that most of the times checkpoints seem
to work fine. However, from time to time, the map state is not saved
properly (getting an empty map state). So, looks like updating the keyed
state like that within the `snapshotState` method is conceptually wrong,
indeed this method does not receive any keyed context to start with. Because
of this, I think the user should not even be allowed to invoke `put` (`nor`
clear) on the map state object. That would help making things less
confusing.

The reason why I am trying to serialize my (keyed state) models inside
`snaphsotState` is because these models are self-evolving and possess their
own (time-varying) state, otherwise I could just serialize them once after
creation on `processElement1` method. So, given this situation, how could I
handle my use case? Ideally, I should only serialize them when checkpoints
are taken, in particular I want to avoid having to serialize them after
every element received in `processElement2` (the state of my models change
with each new element processed here).  Maybe I cannot achieve my goals with
keyed state and need operator state instead.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Lu Niu
Hi, Congxiao

Thanks for replying. yeah, I also found those references. However, as I
mentioned in original post, there is enough capacity in all disk. Also,
when I switch to presto file system, the problem goes away. Wondering
whether others encounter similar issue.

Best
Lu

On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu  wrote:

> Hi
> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
> find any valid local directory for s3ablock-0001-", and I googled the
> exception, found there is some relative page[1], could you please make sure
> there is enough space on the local dis.
>
> [1]
> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午8:41写道:
>
>> Hi, flink users
>>
>> Did anyone encounter such error? The error comes from S3AFileSystem. But
>> there is no capacity issue on any disk. we are using hadoop 2.7.1.
>> ```
>>
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>> Could not open output stream for state backend
>>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>  at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>  ... 3 more
>> Caused by: java.io.IOException: Could not open output stream for state 
>> backend
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>  at 
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>  at 
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>  at 
>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>>  at 
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>>  ... 5 more
>> Caused by: 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>>  Could not find any valid local directory for s3ablock-0001-
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
>>  at 
>> 

回复: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 Thread yangxiaofei
Hi 苟刚:

另外,看了你的计算逻辑,是同一份数据源里面,有不同类型的数据需要分开处理,你可以尝试使用Flink的侧输出来做分流处理,这样逻辑更清晰,而且程序效率也会高许多


| |
ss
|
|
yangx...@163.com
|
签名由网易邮箱大师定制
在2020年4月7日 19:54,LakeShen 写道:
Hi 苟刚,

Flink 任务中,如果开启 Checkpoint 的话,会在每次Checkpoint
完成后,提交偏移量。如果没有开启的话,就是根据自动提交来提交偏移量,默认是开启的,间隔是 5 s.
至于你说每次都是重头开始的,我个人的想法是不是在代码中设置了从最早开始消费,也就是 你使用到了这个方法:setStartFromEarliest[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

Best,
LakeShen

gang.gou  于2020年4月7日周二 下午4:17写道:

好的,我试一下,有结果了同步大家,谢谢!

在 2020/4/7 下午3:52,“Evan” 写入:

之前的代码好像乱码了,我设置了一下,重新发一下,建议你
在获取consumer之后,再设置一下consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6



/**
  * @param env
  * @param topic
  * @param time 订阅的时间
  * @return
  * @throws IllegalAccessException
  */
  public static DataStreamSource

Re: New kafka producer on each checkpoint

2020-04-07 Thread Yun Tang
Hi Maxim

If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE) for 
flink kafka producer. It will create new producer when every new checkpoint 
comes [1]. This is by design and from my point of view, the checkpoint interval 
of 10 seconds might be a bit too often. In general I think interval of 3 
minutes should be enough. If you cannot offer the source rewind time after 
failover, you could turn the interval more often.


[1] 
https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871

Best
Yun Tang

From: Maxim Parkachov 
Sent: Monday, April 6, 2020 23:16
To: user@flink.apache.org 
Subject: New kafka producer on each checkpoint

Hi everyone,

I'm trying to test exactly once functionality with my job under production 
load. The job is reading from kafka, using kafka timestamp as event time, 
aggregates every minute and outputs to other kafka topic. I use checkpoint 
interval 10 seconds.

Everything seems to be working fine, but when I look to the log on INFO level, 
I see that with each checkpoint, new kafka producer is created and then closed 
again.

1. Is this how it is supposed to work ?
2. Is checkpoint interval 10 second too often ?

Thanks,
Maxim.


Re: State size Vs keys number perfromance

2020-04-07 Thread Congxian Qiu
Hi
I'll give some information from my side:
1. The performance for RocksDB is mainly related to the (de)serialization
and disk read/write.
2. MapState just need to (de)serialize the single mapkey/mapvalue when
read/write state, ValueState need to (de)serialize the whole state when
read/write the state
3. disk read/write is somewhat about the whole state size

Best,
Congxian


KristoffSC  于2020年4月8日周三 上午2:41写道:

> Hi,
> I would to ask about what has more memory footprint and what could be more
> efficient regarding
> less keys with bigger keyState vs many keys with smaller keyState
>
> For this use case I'm using RocksDB StateBackend and state TTL is, well..
> infinitive. So I'm keeping the state forever in Flink.
>
> The use case:
> I have a stream of messages that I have to process it in some custom way.
> I can take one of two approaches
>
> 1. use a keyBy that will give me some number of distinct keys but for each
> key, the state size will be significant. It will be MapState in this case.
> The keyBy I used will still give me ability to spread operations across
> operator instances.
>
> 2. In second approach I can use a different keyBy, where I would have huge
> number of distinct keys, but each keyState will be very small and it will
> be
> a ValueState in this case.
>
> To sum up:
> "reasonable" number of keys with very big keySatte VS huge number of keys
> with very small state each.
>
> What are the pros and cons for both?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Congxian Qiu
Hi
>From the stack, seems the problem is that "org.apache.flink.fs.shaded.
hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
find any valid local directory for s3ablock-0001-", and I googled the
exception, found there is some relative page[1], could you please make sure
there is enough space on the local dis.

[1]
https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
Best,
Congxian


Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from S3AFileSystem. But
> there is no capacity issue on any disk. we are using hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>   at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>   at 
> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>   at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>   ... 5 more
> Caused by: 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>  Could not find any valid local directory for s3ablock-0001-
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:168)
>   at 
> 

Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Congxian Qiu
Thanks a lot for the release and your great job, Gordon!
Also thanks to everyone who made this release possible!

Best,
Congxian


Oytun Tez  于2020年4月8日周三 上午2:55写道:

> I should also add, I couldn't agree more with this sentence in the release
> article: "state access/updates and messaging need to be integrated."
>
> This is something we strictly enforce in our Flink case, where we do not
> refer to anything external for storage, use Flink as our DB.
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>
>
> On Tue, Apr 7, 2020 at 12:26 PM Oytun Tez  wrote:
>
>> Great news! Thank you all.
>>
>> On Tue, Apr 7, 2020 at 12:23 PM Marta Paes Moreira 
>> wrote:
>>
>>> Thank you for managing the release, Gordon — you did a tremendous job!
>>> And to everyone else who worked on pushing it through.
>>>
>>> Really excited about the new use cases that StateFun 2.0 unlocks for
>>> Flink users and beyond!
>>>
>>>
>>> Marta
>>>
>>> On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng  wrote:
>>>
 Thanks a lot for the release and your great job, Gordon!
 Also thanks to everyone who made this release possible!

 Best,
 Hequn

 On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
 wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink Stateful Functions 2.0.0.
>
> Stateful Functions is an API that simplifies building distributed
> stateful applications.
> It's based on functions with persistent state that can interact
> dynamically with strong consistency guarantees.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Stateful Functions can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for Stateful Functions published to the PyPI index can be
> found at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker image for building Stateful Functions applications is
> currently being published to Docker Hub.
> Dockerfiles for this release can be found at:
> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
> Progress for creating the Docker Hub repository can be tracked at:
> https://github.com/docker-library/official-images/pull/7749
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Cheers,
> Gordon
>
 --
>>  --
>>
>> [image: MotaWord]
>> Oytun Tez
>> M O T A W O R D | CTO & Co-Founder
>> oy...@motaword.com
>>
>>   
>>
>


Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Lu Niu
Hi, flink users

Did anyone encounter such error? The error comes from S3AFileSystem. But
there is no capacity issue on any disk. we are using hadoop 2.7.1.
```

Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
Caused by: java.io.IOException: Could not open output stream for state backend
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at 
java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more
Caused by: 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
Could not find any valid local directory for s3ablock-0001-
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:168)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
at 
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
at 
org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
at 

Re: Anomaly detection Apache Flink

2020-04-07 Thread Salvador Vigo
Ok, thanks for the clarification.


On Tue, Apr 7, 2020, 7:00 PM Nienhuis, Ryan  wrote:

> Vigo,
>
>
>
> I mean that the algorithm is a standalone piece of code. There are no
> examples that I am aware of for running it using Flink.
>
>
>
> Ryan
>
>
>
> *From:* Salvador Vigo 
> *Sent:* Saturday, April 4, 2020 12:26 AM
> *To:* Marta Paes Moreira 
> *Cc:* Nienhuis, Ryan ; user 
> *Subject:* RE: [EXTERNAL] Anomaly detection Apache Flink
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Thanks for answer.
>
>
>
> @Marta, First answer videos [1], [2]. It was interesting to see this two
> different approaches, although I was looking for some more specific
> implementation. Link number [3], I didn't know the existence of Kinesis, so
> maybe could be good for benchmarking and comparing my results with the
> Kinesis results. Then the approach of CEP, I am very related with this
> topic since my current work is based in the implementation of a CEP
> pipeline for monitoring. The only problem I see here is that you need in
> advance a predefined pattern. But it worth a try.
>
>
>
> @Ryan, I see this idea of the random cut forest algorithm more close to
> the idea I am looking for. What do you mean when you say that doesn't work
> getting it works with Flink?
>
>
>
> Best,
>
>
>
> On Fri, Apr 3, 2020 at 8:47 PM Marta Paes Moreira 
> wrote:
>
> Forgot to mention that you might also want to have a look into Flink CEP
> [1], Flink's library for Complex Event Processing.
>
> It allows you to define and detect event patterns over streams, which can
> come in pretty handy for anomaly detection.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html
>
>
>
> On Fri, Apr 3, 2020 at 6:08 PM Nienhuis, Ryan  wrote:
>
> I would also have a look at the random cut forest algorithm. This is the
> base algorithm that is used for anomaly detection in several AWS services
> (Quicksight, Kinesis Data Analytics, etc.). It doesn’t help with getting it
> working with Flink, but may be a good place to start for an algorithm.
>
>
>
> https://github.com/aws/random-cut-forest-by-aws
>
>
>
> Ryan
>
>
>
> *From:* Marta Paes Moreira 
> *Sent:* Friday, April 3, 2020 5:25 AM
> *To:* Salvador Vigo 
> *Cc:* user 
> *Subject:* RE: [EXTERNAL] Anomaly detection Apache Flink
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi, Salvador.
>
> You can find some more examples of real-time anomaly detection with Flink
> in these presentations from Microsoft [1] and Salesforce [2] at Flink
> Forward. This blogpost [3] also describes how to build that kind of
> application using Kinesis Data Analytics (based on Flink).
>
> Let me know if these resources help!
>
> [1] https://www.youtube.com/watch?v=NhOZ9Q9_wwI
> [2] https://www.youtube.com/watch?v=D4kk1JM8Kcg
> [3]
> https://towardsdatascience.com/real-time-anomaly-detection-with-aws-c237db9eaa3f
>
>
>
> On Fri, Apr 3, 2020 at 11:37 AM Salvador Vigo 
> wrote:
>
> Hi there,
>
> I am working in an approach to make some experiments related with anomaly
> detection in real time with Apache Flink. I would like to know if there are
> already some open issues in the community.
>
> The only example I found was the one of Scott Kidder
>  and the Mux platform, 2017. If any
> one is already working in this topic or know some related work or
> publication I will be grateful.
>
> Best,
>
>


Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Oytun Tez
I should also add, I couldn't agree more with this sentence in the release
article: "state access/updates and messaging need to be integrated."

This is something we strictly enforce in our Flink case, where we do not
refer to anything external for storage, use Flink as our DB.



 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


On Tue, Apr 7, 2020 at 12:26 PM Oytun Tez  wrote:

> Great news! Thank you all.
>
> On Tue, Apr 7, 2020 at 12:23 PM Marta Paes Moreira 
> wrote:
>
>> Thank you for managing the release, Gordon — you did a tremendous job!
>> And to everyone else who worked on pushing it through.
>>
>> Really excited about the new use cases that StateFun 2.0 unlocks for
>> Flink users and beyond!
>>
>>
>> Marta
>>
>> On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng  wrote:
>>
>>> Thanks a lot for the release and your great job, Gordon!
>>> Also thanks to everyone who made this release possible!
>>>
>>> Best,
>>> Hequn
>>>
>>> On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 The Apache Flink community is very happy to announce the release of
 Apache Flink Stateful Functions 2.0.0.

 Stateful Functions is an API that simplifies building distributed
 stateful applications.
 It's based on functions with persistent state that can interact
 dynamically with strong consistency guarantees.

 Please check out the release blog post for an overview of the release:
 https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Maven artifacts for Stateful Functions can be found at:
 https://search.maven.org/search?q=g:org.apache.flink%20statefun

 Python SDK for Stateful Functions published to the PyPI index can be
 found at:
 https://pypi.org/project/apache-flink-statefun/

 Official Docker image for building Stateful Functions applications is
 currently being published to Docker Hub.
 Dockerfiles for this release can be found at:
 https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
 Progress for creating the Docker Hub repository can be tracked at:
 https://github.com/docker-library/official-images/pull/7749

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878

 We would like to thank all contributors of the Apache Flink community
 who made this release possible!

 Cheers,
 Gordon

>>> --
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>


State size Vs keys number perfromance

2020-04-07 Thread KristoffSC
Hi,
I would to ask about what has more memory footprint and what could be more
efficient regarding 
less keys with bigger keyState vs many keys with smaller keyState

For this use case I'm using RocksDB StateBackend and state TTL is, well..
infinitive. So I'm keeping the state forever in Flink.

The use case:
I have a stream of messages that I have to process it in some custom way.
I can take one of two approaches

1. use a keyBy that will give me some number of distinct keys but for each
key, the state size will be significant. It will be MapState in this case.
The keyBy I used will still give me ability to spread operations across
operator instances. 

2. In second approach I can use a different keyBy, where I would have huge
number of distinct keys, but each keyState will be very small and it will be
a ValueState in this case.

To sum up:
"reasonable" number of keys with very big keySatte VS huge number of keys
with very small state each.

What are the pros and cons for both?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink incremental checkpointing - how long does data is kept in the share folder

2020-04-07 Thread Yun Tang
Hi Shachar

Why do we see data that is older from lateness configuration
There might existed three reasons:

  1.  RocksDB really still need that file in current checkpoint. If we upload 
one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint 
could still include that 42.sst file again if that file is never be compacted 
since then. This is possible in theory.
  2.  Your checkpoint size is large and checkpoint coordinator could not remove 
as fast as possible before exit.
  3.  That file is created by a crash task manager and not known to checkpoint 
coordinator.

How do I know that the files belong to a valid checkpoint and not a checkpoint 
of a crushed job - so we can delete those files
You have to call Checkpoints#loadCheckpointMetadata[1] to load latest _metadata 
in checkpoint directory and compare the file paths with current files in 
checkpoint directory. The ones are not in the checkpoint meta and older than 
latest checkpoint could be removed. You could follow this to debug or maybe I 
could write a tool to help know what files could be deleted later.

[1] 
https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96

Best
Yun Tang


From: Shachar Carmeli 
Sent: Tuesday, April 7, 2020 16:19
To: user@flink.apache.org 
Subject: Flink incremental checkpointing - how long does data is kept in the 
share folder

We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one 
checkpoint at a time , using incremental and using rocksdb.

We run windows with lateness of 3 days , which means that we expect that no 
data in the checkpoint share folder will be kept after 3-4 days ,Still We see 
that there is data from more than that
e.g.
If today is 7/4 there are some files from the 2/4

Sometime we see checkpoints that we assume (due to the fact that its index 
number is not in synch) that it belongs to a job that crushed and the 
checkpoint was not used to restore the job

My questions are

Why do we see data that is older from lateness configuration
How do I know that the files belong to a valid checkpoint and not a checkpoint 
of a crushed job - so we can delete those files


RE: Anomaly detection Apache Flink

2020-04-07 Thread Nienhuis, Ryan
Vigo,

I mean that the algorithm is a standalone piece of code. There are no examples 
that I am aware of for running it using Flink.

Ryan

From: Salvador Vigo 
Sent: Saturday, April 4, 2020 12:26 AM
To: Marta Paes Moreira 
Cc: Nienhuis, Ryan ; user 
Subject: RE: [EXTERNAL] Anomaly detection Apache Flink


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks for answer.

@Marta, First answer videos [1], [2]. It was interesting to see this two 
different approaches, although I was looking for some more specific 
implementation. Link number [3], I didn't know the existence of Kinesis, so 
maybe could be good for benchmarking and comparing my results with the Kinesis 
results. Then the approach of CEP, I am very related with this topic since my 
current work is based in the implementation of a CEP pipeline for monitoring. 
The only problem I see here is that you need in advance a predefined pattern. 
But it worth a try.

@Ryan, I see this idea of the random cut forest algorithm more close to the 
idea I am looking for. What do you mean when you say that doesn't work getting 
it works with Flink?

Best,

On Fri, Apr 3, 2020 at 8:47 PM Marta Paes Moreira 
mailto:ma...@ververica.com>> wrote:
Forgot to mention that you might also want to have a look into Flink CEP [1], 
Flink's library for Complex Event Processing.

It allows you to define and detect event patterns over streams, which can come 
in pretty handy for anomaly detection.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html

On Fri, Apr 3, 2020 at 6:08 PM Nienhuis, Ryan 
mailto:nienh...@amazon.com>> wrote:
I would also have a look at the random cut forest algorithm. This is the base 
algorithm that is used for anomaly detection in several AWS services 
(Quicksight, Kinesis Data Analytics, etc.). It doesn’t help with getting it 
working with Flink, but may be a good place to start for an algorithm.

https://github.com/aws/random-cut-forest-by-aws

Ryan

From: Marta Paes Moreira mailto:ma...@ververica.com>>
Sent: Friday, April 3, 2020 5:25 AM
To: Salvador Vigo mailto:salvador...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: RE: [EXTERNAL] Anomaly detection Apache Flink


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi, Salvador.

You can find some more examples of real-time anomaly detection with Flink in 
these presentations from Microsoft [1] and Salesforce [2] at Flink Forward. 
This blogpost [3] also describes how to build that kind of application using 
Kinesis Data Analytics (based on Flink).

Let me know if these resources help!

[1] https://www.youtube.com/watch?v=NhOZ9Q9_wwI
[2] https://www.youtube.com/watch?v=D4kk1JM8Kcg
[3] 
https://towardsdatascience.com/real-time-anomaly-detection-with-aws-c237db9eaa3f

On Fri, Apr 3, 2020 at 11:37 AM Salvador Vigo 
mailto:salvador...@gmail.com>> wrote:
Hi there,
I am working in an approach to make some experiments related with anomaly 
detection in real time with Apache Flink. I would like to know if there are 
already some open issues in the community.
The only example I found was the one of Scott 
Kidder and the Mux platform, 2017. If any 
one is already working in this topic or know some related work or publication I 
will be grateful.
Best,


Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Oytun Tez
Great news! Thank you all.

On Tue, Apr 7, 2020 at 12:23 PM Marta Paes Moreira 
wrote:

> Thank you for managing the release, Gordon — you did a tremendous job! And
> to everyone else who worked on pushing it through.
>
> Really excited about the new use cases that StateFun 2.0 unlocks for Flink
> users and beyond!
>
>
> Marta
>
> On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng  wrote:
>
>> Thanks a lot for the release and your great job, Gordon!
>> Also thanks to everyone who made this release possible!
>>
>> Best,
>> Hequn
>>
>> On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink Stateful Functions 2.0.0.
>>>
>>> Stateful Functions is an API that simplifies building distributed
>>> stateful applications.
>>> It's based on functions with persistent state that can interact
>>> dynamically with strong consistency guarantees.
>>>
>>> Please check out the release blog post for an overview of the release:
>>> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Maven artifacts for Stateful Functions can be found at:
>>> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>>>
>>> Python SDK for Stateful Functions published to the PyPI index can be
>>> found at:
>>> https://pypi.org/project/apache-flink-statefun/
>>>
>>> Official Docker image for building Stateful Functions applications is
>>> currently being published to Docker Hub.
>>> Dockerfiles for this release can be found at:
>>> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
>>> Progress for creating the Docker Hub repository can be tracked at:
>>> https://github.com/docker-library/official-images/pull/7749
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Cheers,
>>> Gordon
>>>
>> --
 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Marta Paes Moreira
Thank you for managing the release, Gordon — you did a tremendous job! And
to everyone else who worked on pushing it through.

Really excited about the new use cases that StateFun 2.0 unlocks for Flink
users and beyond!

Marta

On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng  wrote:

> Thanks a lot for the release and your great job, Gordon!
> Also thanks to everyone who made this release possible!
>
> Best,
> Hequn
>
> On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink Stateful Functions 2.0.0.
>>
>> Stateful Functions is an API that simplifies building distributed
>> stateful applications.
>> It's based on functions with persistent state that can interact
>> dynamically with strong consistency guarantees.
>>
>> Please check out the release blog post for an overview of the release:
>> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Maven artifacts for Stateful Functions can be found at:
>> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>>
>> Python SDK for Stateful Functions published to the PyPI index can be
>> found at:
>> https://pypi.org/project/apache-flink-statefun/
>>
>> Official Docker image for building Stateful Functions applications is
>> currently being published to Docker Hub.
>> Dockerfiles for this release can be found at:
>> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
>> Progress for creating the Docker Hub repository can be tracked at:
>> https://github.com/docker-library/official-images/pull/7749
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Cheers,
>> Gordon
>>
>


Re: Creating singleton objects per task manager

2020-04-07 Thread Seth Wiesman
Hi Kristoff,

You are correct that, that was a typo :)
At most one instance per slot. 

Seth  

> On Apr 7, 2020, at 9:41 AM, KristoffSC  wrote:
> 
> Hi Seth,
> I would like to piggyback on this question :)
> 
> You wrote:
> "I would strongly encourage you to create one instance of your object per
> ProcessFunction, inside of open. That would be one instance per slot which
> is not equal to the parallelism of your operator."
> 
> Especially the second part "That would be one instance per slot which is not
> equal to the parallelism of your operator"
> 
> For my understanding the number of processFunction instances is equal to the
> parallelism level of this operator. Paralleled instances are not deployed on
> the same task slot, therefore if you create objects in open() method then
> you will have as many objects as there are processFunction instances which
> in my understanding is equal to the parallelism level for this operator.
> 
> Thanks,
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink 1.9 conflict jackson version

2020-04-07 Thread Fanbin Bu
Hi Aj,

I got a work around to put my app jar inside /usr/lib/flink/lib directory.

On Mon, Apr 6, 2020 at 11:27 PM aj  wrote:

> Hi Fanbin,
>
> I am facing a similar kind of issue. Let me know if you are able to
> resolve this issue then please help me also
>
>
> https://stackoverflow.com/questions/61012350/flink-reading-a-s3-file-causing-jackson-dependency-issue
>
>
>
> On Tue, Dec 17, 2019 at 7:50 AM ouywl  wrote:
>
>> Hi Bu
>>I think It can use mvn-shade-plugin to resolve your problem,  It seem
>> flink-client conflict with your owner jar?
>>
>> ouywl
>> ou...@139.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 12/17/2019 08:10,Fanbin Bu
>>  wrote:
>>
>> Hi,
>>
>> After I upgrade flink 1.9, I got the following error message on EMR, it
>> works locally on IntelliJ.
>>
>> I'm explicitly declaring the dependency as
>> implementation
>> 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
>> and I have
>> implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version:
>> '1.11.595'
>>
>>
>>
>> java.lang.NoSuchMethodError: 
>> com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper;
>>  at 
>> com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54)
>>  at 
>> com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30)
>>  at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65)
>>  at 
>> com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53)
>>  at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)
>>  at 
>> com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256)
>>  at 
>> com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460)
>>  at 
>> com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
>>  at 
>> com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80)
>>  at 
>> com.coinbase.util.KmsClient$.getSnowflakeUsernamePassword(KmsClient.scala:21)
>>  at com.coinbase.ml.RunFlinkJob$.runBatch(RunFlinkJob.scala:94)
>>  at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:38)
>>  at 
>> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
>>  at 
>> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>  at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at javax.security.auth.Subject.doAs(Subject.java:422)
>>  at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>>  at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> 
>
>
> 
>


TCP streams to multiple clients

2020-04-07 Thread Simec, Nick
I'd like to put flink on a proxy server to read in a stream from an external 
source and then distribute that stream to multiple servers. Is that possible 
with Flink? Would I have to replicate the data or anything?


I want source -> proxy-server -> (FLINK) -> -> -> -> -> 5 servers

Flink reads the stream coming in to the proxy-server and sends it to 5 other 
servers.


Re: Dynamic Flink SQL

2020-04-07 Thread Krzysztof Zarzycki
Hi Maciej, thanks for joining. I answer your comments below.

>
> the idea is quite interesting - although maintaining some coordination to
> be able to handle checkpoints would probably pretty tricky. Did you figure
> out how to handle proper distribution of tasks between TMs? As far as I
> understand you have to guarantee that all sources reading from cache are on
> the same TM as sinks writing data from Kafka? Or you think about some
> distributed caches?
>
No, we haven't yet figured that out. Yes, I've heard that it is indeed a
problem to force Flink to distribute the tasks as one wants it. I only
hoped that we will be able to solve it when we get there :-)
One of the ideas was to actually use in-memory grid co-located with Flink
(like based on Apache Ignite), but then the problem of network shuffle just
moved from Kafka to that grid. Which might be smaller problem, but still.

> As for your original question - we are also looking for solutions/ideas
> for this problem in Nussknacker. We have similar problem, however we had
> different constraints (on premise, not have to care too much about
> bandwidth) and we went with "one job per scenario". It works ok, but the
> biggest problem for me is that it does not scale with the number of jobs:
> Flink job is quite heavy entity - all the threads, classloaders etc. Having
> more than a few dozens of jobs is also not so easy to handle on JobManager
> part - especially when it's restarted etc. I guess your idea would also
> suffer from this problem?
>
Unfortunately yes, good point. Maybe it can be mitigated if I had the jobs
distributed among several Flink clusters. Solution globally heavier, but
lighter on each cluster. Then the data must go to the distributed in-memory
grid with hopefully local reads only.

I see a lot of difficulties in the discussed approach. But my willingness
to use SQL/Table API and CEP is so strong, I want to do the PoC regardless.
I hope we will be able to provide benchmarks which prove that the
performance of such approach is significantly better justifying the work by
us, maybe also the community, on overcoming these difficulties.

>
> thanks,
>
> maciek
>
>
>
> On 27/03/2020 10:18, Krzysztof Zarzycki wrote:
>
> I want to do a bit different hacky PoC:
> * I will write a sink, that caches the results in "JVM global" memory.
> Then I will write a source, that reads this cache.
> * I will launch one job, that reads from Kafka source, shuffles the data
> to the desired partitioning and then sinks to that cache.
> * Then I will lunch multiple jobs (Datastream based or Flink SQL based) ,
> that uses the source from cache to read the data out and then reinterprets
> it as keyed stream [1].
> * Using JVM global memory is necessary, because AFAIK the jobs use
> different classloaders. The class of cached object also needs to be
> available in the parent classloader i.e. in the cluster's classpath.
> This is just to prove the idea, the performance and usefulness of it. All
> the problems of checkpointing this data I will leave for later.
>
> I'm very very interested in your, community, comments about this idea and
> later productization of it.
> Thanks!
>
> Answering your comments:
>
>> Unless you need reprocessing for newly added rules, I'd probably just
>> cancel with savepoint and restart the application with the new rules. Of
>> course, it depends on the rules themselves and how much state they require
>> if a restart is viable. That's up to a POC.
>>
> No, I don't need reprocessing (yet). The rule starts processing the data
> from the moment it is defined.
> The cancellation with savepoint was considered, but because the number of
> new rules defined/changed daily might be large enough, that will generate
> too much of downtime. There is a lot of state kept in those rules making
> the restart heavy. What's worse, that would be cross-tenant downtime,
> unless the job was somehow per team/tenant. Therefore we reject this option.
> BTW, the current design of our system is similar to the one from the blog
> series by Alexander Fedulov about dynamic rules pattern [2] he's just
> publishing.
>
>
>> They will consume the same high intensive source(s) therefore I want to
>>> optimize for that by consuming the messages in Flink only once.
>>>
>> That's why I proposed to run one big query instead of 500 small ones.
>> Have a POC where you add two of your rules manually to a Table and see how
>> the optimized logical plan looks like. I'd bet that the source is only
>> tapped once.
>>
>
> I can do that PoC, no problem. But AFAIK it will only work with the
> "restart with savepoint" pattern discussed above.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html
>
>
>
>> On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki 
>> wrote:
>>
>>> Hello Arvid,
>>> Thanks for joining to 

Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Hequn Cheng
Thanks a lot for the release and your great job, Gordon!
Also thanks to everyone who made this release possible!

Best,
Hequn

On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Stateful Functions 2.0.0.
>
> Stateful Functions is an API that simplifies building distributed stateful
> applications.
> It's based on functions with persistent state that can interact
> dynamically with strong consistency guarantees.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Stateful Functions can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for Stateful Functions published to the PyPI index can be found
> at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker image for building Stateful Functions applications is
> currently being published to Docker Hub.
> Dockerfiles for this release can be found at:
> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
> Progress for creating the Docker Hub repository can be tracked at:
> https://github.com/docker-library/official-images/pull/7749
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Cheers,
> Gordon
>


Re: Creating singleton objects per task manager

2020-04-07 Thread KristoffSC
Hi Seth,
I would like to piggyback on this question :)

You wrote:
"I would strongly encourage you to create one instance of your object per
ProcessFunction, inside of open. That would be one instance per slot which
is not equal to the parallelism of your operator."

Especially the second part "That would be one instance per slot which is not
equal to the parallelism of your operator"

For my understanding the number of processFunction instances is equal to the
parallelism level of this operator. Paralleled instances are not deployed on
the same task slot, therefore if you create objects in open() method then
you will have as many objects as there are processFunction instances which
in my understanding is equal to the parallelism level for this operator.

Thanks,



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Creating singleton objects per task manager

2020-04-07 Thread Salva Alcántara
Thanks a lot Seth!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Pulsar as a state backend

2020-04-07 Thread Michael Colson
Hello Markos,

Thanks for the quick answer and the good news.
I'll follow the progress and the FLIP-72.

Thanks,

Le mar. 7 avr. 2020 à 03:50, Markos Sfikas  a écrit :

> Hi Michael,
>
> Thanks for the question regarding the Pulsar Connector.
>
> The team is currently focusing on further developing the Flink-Pulsar
> connector with the following items:
>
>- Addition of a columnar off-loader to improve the performance for
>batch queries.The columnar segment off-loader is currently in progress
>and expected to be out sometime in May.
>- Work on state integration is also in progress and expected sometime
>in May.
>- Using Pulsar as a Flink state backend is something that the team
>will also be working on once the off-loader (mentioned above) is complete
>so this should be expected sometime in June.
>
> All such development efforts will be part of the Pulsar Connector, with
> the intention to be contributed back to the Flink community.
>
> You can follow the progress across all development efforts in FLIP-72[1]
>
> I hope this helps.
>
> Best
> Markos
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector
>
> On Mon, 6 Apr 2020 at 17:24, Michael Colson  wrote:
>
>> Hello,
>>
>> I recently browse this post :
>> https://flink.apache.org/2019/05/03/pulsar-flink.html
>> and mainly :
>>
>> *Finally, an alternative way of integrating the technologies could
>> include using Pulsar as a state backend with Flink. Since Pulsar has a
>> layered architecture (Streams and Segmented Streams, powered by Apache
>> Bookkeeper), it becomes natural to use Pulsar as a storage layer and store
>> Flink state.*
>>
>>
>> Are there any advanced specifications or efforts in the current roadmap
>> for this integration ?
>>
>> Thanks,
>>
>> --
>> You received this electronic message as part of a business or employment
>> relationship with one or several S4M entities. Its content is strictly
>> confidential and is covered by the obligation of confidentiality and
>> business secrecy. Any dissemination, copying, printing distribution,
>> retention or use of the message’s content or any attachments that could be
>> detrimental to S4M is forbidden, even if it was forwarded by mailing lists.
>>
>> If you are not the intended recipient, please notify the sender of the
>> error without delay and delete permanently this email and any files from
>> your system and destroy any printed copies.
>>
>
>
>

-- 




You received this electronic message as part of a business or 
employment relationship with one or several S4M entities. Its content is 
strictly confidential and is covered by the obligation of confidentiality 
and business secrecy. Any dissemination, copying, printing distribution, 
retention or use of the message’s content or any attachments that could be 
detrimental to S4M is forbidden, even if it was forwarded by mailing lists. 

If you are not the intended recipient, please notify the sender of the 
error without delay and delete permanently this email and any files from 
your system and destroy any printed copies. 


Re: State Processor API with Beam

2020-04-07 Thread Stephen Patel
Thanks Seth, I'll look into rolling my own KeyedStateInputFormat.

On Mon, Apr 6, 2020 at 2:50 PM Seth Wiesman  wrote:

> Hi Stephen,
>
> You will need to implement a custom operator and user the `transform`
> method. It's not just that you need to specify the namespace type but you
> will also need to look into the beam internals to see how it stores data in
> flink state, how it translates between beam serializers and flink
> serializers, etc.
>
> Seth
>
> On Mon, Apr 6, 2020 at 1:02 PM Stephen Patel  wrote:
>
>> I've got an apache beam pipeline running on flink (1.9.1).
>>
>> I've been attempting to read a RocksDB savepoint taken from this
>> beam-on-flink pipeline, using the state processor api, however it seems to
>> have some incompatibilities around namespaces.  Beam for instance uses a
>> String namespace, while the KeyedStateInputFormat uses the
>> VoidNamespace. This manifests as an exception:
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new namespace 
>> serializer must be compatible.
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>>  at 
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>  at 
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>>  at 
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>>  at 
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>>  at 
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>  at 
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
>>
>> Is there any way to let the namespace type (and value) be specified by
>> the user?
>>
>


[ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 2.0.0.

Stateful Functions is an API that simplifies building distributed stateful
applications.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Docker image for building Stateful Functions applications is
currently being published to Docker Hub.
Dockerfiles for this release can be found at:
https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
Progress for creating the Docker Hub repository can be tracked at:
https://github.com/docker-library/official-images/pull/7749

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346878

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 Thread LakeShen
Hi 苟刚,

Flink 任务中,如果开启 Checkpoint 的话,会在每次Checkpoint
完成后,提交偏移量。如果没有开启的话,就是根据自动提交来提交偏移量,默认是开启的,间隔是 5 s.
至于你说每次都是重头开始的,我个人的想法是不是在代码中设置了从最早开始消费,也就是 你使用到了这个方法:setStartFromEarliest[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

Best,
LakeShen

gang.gou  于2020年4月7日周二 下午4:17写道:

> 好的,我试一下,有结果了同步大家,谢谢!
>
> 在 2020/4/7 下午3:52,“Evan” 163@flink.apache.org 代表 chengyanan1...@foxmail.com> 写入:
>
> 之前的代码好像乱码了,我设置了一下,重新发一下,建议你
> 在获取consumer之后,再设置一下consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
> Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6
>
>
>
> /**
>   * @param env
>   * @param topic
>   * @param time 订阅的时间
>   * @return
>   * @throws IllegalAccessException
>   */
>   public static DataStreamSource buildSource(StreamExecutionEnvironment env, String topic, Long time) throws
> IllegalAccessException {
> ParameterTool parameterTool =
> (ParameterTool) env.getConfig().getGlobalJobParameters();
> Properties props =
> buildKafkaProps(parameterTool);
>
> FlinkKafkaConsumer011 FlinkKafkaConsumer011<(
> topic,
> new
> MetricSchema(),
> props);
>
>
>consumer.setStartFromLatest();
>
>
> consumer.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor   @Override
>   public long
> extractAscendingTimestamp(XlogStreamBasicBean element) {
> if (element ==
> null || element.getTimestamp() == null) {
>  
> return System.currentTimeMillis();
> }
> return
> element.getTimestamp() - 1;
>   }
> });
> return env.addSource(consumer);
>   }
>
>
> }
>
>
>
>
>
> --原始邮件--
> 发件人:"苟刚" 发送时间:2020年4月7日(星期二) 中午11:27
> 收件人:"user-zh"
> 主题:fink新增计算逻辑时kafka从头开始追平消费记录
>
>
>
> Hello,
>
>  
> 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>  我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>
>
> flink版本:1.6.3
>
> 部分代码如下:
>
> public static void main(String[] args) throws Exception {
> final ParameterTool parameterTool =
> ExecutionEnvUtil.createParameterTool(args);
>  StreamExecutionEnvironment env =
> ExecutionEnvUtil.prepare(parameterTool);
>
>  DataStreamSource KafkaTools.buildSource(env);
> // 处理timing数据
> processTimingData(parameterTool, data);
> // 处理front error数据
> processFrontErrorData(parameterTool, data);
> // 处理img error数据
> processImgLoadErrorData(parameterTool, data);
>  env.execute("xlog compute");
> }
>
>
>
>
> kafka的连接参数配置:
> public static Properties buildKafkaProps(ParameterTool parameterTool) {
>  Properties props = parameterTool.getProperties();
>  props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
> DEFAULT_KAFKA_BROKERS));
>  props.put("zookeeper.connect",
> parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
> DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>  props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
> DEFAULT_KAFKA_GROUP_ID));
>  props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>  props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>  props.put("auto.offset.reset", "latest");
> return props;
> }
>
>
>
>
>
>
>
> --
>
> Best Wishes
>  Galen.K
>
>
>


Re: 如何合并 binlog stream 和 table stream?

2020-04-07 Thread 刘宇宝
你这是个新思路,分成两个 job,但是感觉不太值当,或许这里是 Flink目前 
API或者说编程模型很受限的地方,我只是源头数据来自两个地方,要合并下两个数据源,所有下游处理都是一样的。如果按照 actor 的松散模式,我是可以在两个 
SourceActor 之间协调的,一个 SourceActor 发完后,通知另一个 SourceActor 再发,或者启动一个新的 
SourceActor,大家都往同一个下游 actor 发消息。

user@flink 里那个哥们提到 InputSelectable,我还没看明白怎么能用到 DataStream上,似乎它只实现在 
StreamOperator 上:  
https://github.com/apache/flink/search?p=2=InputSelectable_q=InputSelectable

我目前想到一个笨方法,实现一个 SourceFunction,把 FlinkKafkaConsumerBase 和 JDBCInputFormat 
包到一起,这样可以先把 JDBCInputFormat 数据发完了,再发 FlinkKafkaConsumerBase。 
但是这样做只能单并发,多并发的话需要一个分布式的 barrier,flink 没有内置支持,感觉不是个优美的解决方案。

非常感谢你的解答!



On 2020/4/7, 4:29 PM, "Jark Wu"  wrote:

如果你的作业没有 state,那么 全量和增量部分,可以分成两个独立的作业。
如果你的作业有 state,主要就是设计 state 复用的问题。两个作业的拓扑结构需要确保一致(为了复用 savepoint),一个作业的
source operator 是 jdbc,另一个 source operator 是 kafka。
当运行完 jdbc 作业后,对作业触发一次 savepoint,然后用这个 savepoint 恢复 kafka 作业,可以从 earliest
开始读取(假设作业支持幂等)。

这里的一个问题,主要是如何对 jdbc 作业触发 savepoint,因为 jdbc InputFormat 目前是
bounded,所以读完后整个作业就结束了,就无法进行 savepoint。
所以这里可能需要自己修改下源码,让 jdbc source 永远不要结束,但通过日志或者 metric 或其他方式通知外界数据已经读完(可以开始触发
savepoint)。

希望这些可以帮助到你。

Best.
Jark


On Tue, 7 Apr 2020 at 16:14, 刘宇宝  wrote:

> 我看了下 streamsql 那个 bootstrap 一段,没看懂怎么处理,我现在已经有 mysql table,算是 materialized
> view 了,也有一份
> Kafka 里的 binlog 数据,问题的麻烦在于怎么「先消费 jdbc table, 消费完后,再切换到 kafka 上」,从 flink
> 文档来看,一旦
> 我把 jdbc table 的流和 kafka 的流加入 env 里,env.execute() 之后,两个流就同时往下游发数据了——我期望的是
> jdbc table
> 的流发完了,才开始发 kafka 的流。
>
> 谢谢!
>
> On 2020/4/7, 2:16 PM, "Jark Wu"  wrote:
>
> Hi,
>
> 你这里的合并是用join 来做么? 这样的话,会比较耗性能。
>
> 一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 是幂等操作的,因为会有多处理一部分的
> binlog,没法做到 精确地切换到 kafka offset 上。
>
> 另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
> https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
>
> Best,
> Jark
>
>
> On Sun, 5 Apr 2020 at 22:48, 刘宇宝  wrote:
>
> > 大家好,
> >
> > 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA
> 有一个
> > topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
> >
> >
> >   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
> > binlog-stream,但是要暂停消费 Kafka;
> >   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为
> table-stream;
> >   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保
> binlog 是
> > *后*  应用到某个快照表上。
> >
> > 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
> > startBinlog,初始值为 false:
> >
> >   binlog-stream -> waitOperator   ->   sinkOperator
> >   table-stream -> notifyOperator -> sinkOperator
> >
> > 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
> > table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样
> binlog-stream
> > 就能被继续消费了。
> >
> > 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
> >
> > 请教怎么破?
> >
> > 谢谢!
> >
> >
>
>
>




Re: 如何合并 binlog stream 和 table stream?

2020-04-07 Thread Jark Wu
如果你的作业没有 state,那么 全量和增量部分,可以分成两个独立的作业。
如果你的作业有 state,主要就是设计 state 复用的问题。两个作业的拓扑结构需要确保一致(为了复用 savepoint),一个作业的
source operator 是 jdbc,另一个 source operator 是 kafka。
当运行完 jdbc 作业后,对作业触发一次 savepoint,然后用这个 savepoint 恢复 kafka 作业,可以从 earliest
开始读取(假设作业支持幂等)。

这里的一个问题,主要是如何对 jdbc 作业触发 savepoint,因为 jdbc InputFormat 目前是
bounded,所以读完后整个作业就结束了,就无法进行 savepoint。
所以这里可能需要自己修改下源码,让 jdbc source 永远不要结束,但通过日志或者 metric 或其他方式通知外界数据已经读完(可以开始触发
savepoint)。

希望这些可以帮助到你。

Best.
Jark


On Tue, 7 Apr 2020 at 16:14, 刘宇宝  wrote:

> 我看了下 streamsql 那个 bootstrap 一段,没看懂怎么处理,我现在已经有 mysql table,算是 materialized
> view 了,也有一份
> Kafka 里的 binlog 数据,问题的麻烦在于怎么「先消费 jdbc table, 消费完后,再切换到 kafka 上」,从 flink
> 文档来看,一旦
> 我把 jdbc table 的流和 kafka 的流加入 env 里,env.execute() 之后,两个流就同时往下游发数据了——我期望的是
> jdbc table
> 的流发完了,才开始发 kafka 的流。
>
> 谢谢!
>
> On 2020/4/7, 2:16 PM, "Jark Wu"  wrote:
>
> Hi,
>
> 你这里的合并是用join 来做么? 这样的话,会比较耗性能。
>
> 一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 是幂等操作的,因为会有多处理一部分的
> binlog,没法做到 精确地切换到 kafka offset 上。
>
> 另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
> https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar
>
> Best,
> Jark
>
>
> On Sun, 5 Apr 2020 at 22:48, 刘宇宝  wrote:
>
> > 大家好,
> >
> > 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA
> 有一个
> > topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
> >
> >
> >   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
> > binlog-stream,但是要暂停消费 Kafka;
> >   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为
> table-stream;
> >   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保
> binlog 是
> > *后*  应用到某个快照表上。
> >
> > 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
> > startBinlog,初始值为 false:
> >
> >   binlog-stream -> waitOperator   ->   sinkOperator
> >   table-stream -> notifyOperator -> sinkOperator
> >
> > 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
> > table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样
> binlog-stream
> > 就能被继续消费了。
> >
> > 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
> >
> > 请教怎么破?
> >
> > 谢谢!
> >
> >
>
>
>


Flink incremental checkpointing - how long does data is kept in the share folder

2020-04-07 Thread Shachar Carmeli
We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one 
checkpoint at a time , using incremental and using rocksdb.

We run windows with lateness of 3 days , which means that we expect that no 
data in the checkpoint share folder will be kept after 3-4 days ,Still We see 
that there is data from more than that
e.g.
If today is 7/4 there are some files from the 2/4

Sometime we see checkpoints that we assume (due to the fact that its index 
number is not in synch) that it belongs to a job that crushed and the 
checkpoint was not used to restore the job

My questions are

Why do we see data that is older from lateness configuration
How do I know that the files belong to a valid checkpoint and not a checkpoint 
of a crushed job - so we can delete those files


Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 Thread gang.gou
好的,我试一下,有结果了同步大家,谢谢!

在 2020/4/7 
下午3:52,“Evan” 写入:

之前的代码好像乱码了,我设置了一下,重新发一下,建议你 
在获取consumer之后,再设置一下consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
 Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6



/**
  * @param env
  * @param topic
  * @param time 订阅的时间
  * @return
  * @throws IllegalAccessException
  */
  public static DataStreamSource

fink sql client not able to read parquet format table

2020-04-07 Thread wangl...@geekplus.com.cn

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



Re: 如何合并 binlog stream 和 table stream?

2020-04-07 Thread 刘宇宝
我看了下 streamsql 那个 bootstrap 一段,没看懂怎么处理,我现在已经有 mysql table,算是 materialized view 
了,也有一份
Kafka 里的 binlog 数据,问题的麻烦在于怎么「先消费 jdbc table, 消费完后,再切换到 kafka 上」,从 flink 文档来看,一旦
我把 jdbc table 的流和 kafka 的流加入 env 里,env.execute() 之后,两个流就同时往下游发数据了——我期望的是 jdbc 
table
的流发完了,才开始发 kafka 的流。

谢谢!

On 2020/4/7, 2:16 PM, "Jark Wu"  wrote:

Hi,

你这里的合并是用join 来做么? 这样的话,会比较耗性能。

一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 是幂等操作的,因为会有多处理一部分的
binlog,没法做到 精确地切换到 kafka offset 上。

另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar

Best,
Jark


On Sun, 5 Apr 2020 at 22:48, 刘宇宝  wrote:

> 大家好,
>
> 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA 有一个
> topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
>
>
>   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
> binlog-stream,但是要暂停消费 Kafka;
>   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为 table-stream;
>   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保 binlog 是
> *后*  应用到某个快照表上。
>
> 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
> startBinlog,初始值为 false:
>
>   binlog-stream -> waitOperator   ->   sinkOperator
>   table-stream -> notifyOperator -> sinkOperator
>
> 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
> table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样 binlog-stream
> 就能被继续消费了。
>
> 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
>
> 请教怎么破?
>
> 谢谢!
>
>




Re: 如何合并 binlog stream 和 table stream?

2020-04-07 Thread 刘宇宝
没有 join,只是简单的 union:


DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
DataStream snapshotStream = 
env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
 
// map() is to convert two streams into same type:  (action,  fields…), 
 where action is “insert”, “update”, “delete”.  The action for “snapshotStream” 
is always “insert”.
DataStream tableStream = 
binlogStream.map(…).union(snapshotStream.map(…));
 
tableStream.print();
env.execute(“example”);

我希望下游只看到一个流,这个流里先出现 snapshotStream 的所有消息,等这个发完后,再从 binlog 里读取,但是上面的代码段里
我控制不了 snapshotStream和 binlogStream 谁先发完消息。

你说的消费完后再切换到 Kafka 具体怎么做? DataStream 「消费完」这个事件没有暴露 hook 出来,而且好像 DataStream 的 DAG 
构造好后不能变了??

Snapshot + binlog 的幂等是可以保证的,binlog 的 insert/update/delete 总是覆盖到 snapshot 之上。

user@flink 邮件列表里有人提到 side input,跟我的需求很像,binlogStream 开一个 side input 读取完 
snapshotStream 然后再发自己(binlogStream) 的消息,但是可惜这个功能还没做完。


谢谢!


On 2020/4/7, 2:16 PM, "Jark Wu"  wrote:

Hi,

你这里的合并是用join 来做么? 这样的话,会比较耗性能。

一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 是幂等操作的,因为会有多处理一部分的
binlog,没法做到 精确地切换到 kafka offset 上。

另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar

Best,
Jark


On Sun, 5 Apr 2020 at 22:48, 刘宇宝  wrote:

> 大家好,
>
> 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA 有一个
> topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
>
>
>   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
> binlog-stream,但是要暂停消费 Kafka;
>   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为 table-stream;
>   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保 binlog 是
> *后*  应用到某个快照表上。
>
> 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
> startBinlog,初始值为 false:
>
>   binlog-stream -> waitOperator   ->   sinkOperator
>   table-stream -> notifyOperator -> sinkOperator
>
> 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
> table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样 binlog-stream
> 就能被继续消费了。
>
> 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
>
> 请教怎么破?
>
> 谢谢!
>
>




Re: Pulsar as a state backend

2020-04-07 Thread Markos Sfikas
Hi Michael,

Thanks for the question regarding the Pulsar Connector.

The team is currently focusing on further developing the Flink-Pulsar
connector with the following items:

   - Addition of a columnar off-loader to improve the performance for batch
   queries.The columnar segment off-loader is currently in progress and
   expected to be out sometime in May.
   - Work on state integration is also in progress and expected sometime in
   May.
   - Using Pulsar as a Flink state backend is something that the team will
   also be working on once the off-loader (mentioned above) is complete so
   this should be expected sometime in June.

All such development efforts will be part of the Pulsar Connector, with
the intention to be contributed back to the Flink community.

You can follow the progress across all development efforts in FLIP-72[1]

I hope this helps.

Best
Markos

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector

On Mon, 6 Apr 2020 at 17:24, Michael Colson  wrote:

> Hello,
>
> I recently browse this post :
> https://flink.apache.org/2019/05/03/pulsar-flink.html
> and mainly :
>
> *Finally, an alternative way of integrating the technologies could include
> using Pulsar as a state backend with Flink. Since Pulsar has a layered
> architecture (Streams and Segmented Streams, powered by Apache Bookkeeper),
> it becomes natural to use Pulsar as a storage layer and store Flink state.*
>
>
> Are there any advanced specifications or efforts in the current roadmap
> for this integration ?
>
> Thanks,
>
> --
> You received this electronic message as part of a business or employment
> relationship with one or several S4M entities. Its content is strictly
> confidential and is covered by the obligation of confidentiality and
> business secrecy. Any dissemination, copying, printing distribution,
> retention or use of the message’s content or any attachments that could be
> detrimental to S4M is forbidden, even if it was forwarded by mailing lists.
>
> If you are not the intended recipient, please notify the sender of the
> error without delay and delete permanently this email and any files from
> your system and destroy any printed copies.
>


回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 Thread Evan
之前的代码好像乱码了,我设置了一下,重新发一下,建议你 
在获取consumer之后,再设置一下consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
 Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6



/**
  * @param env
  * @param topic
  * @param time 订阅的时间
  * @return
  * @throws IllegalAccessException
  */
  public static DataStreamSource

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

2020-04-07 Thread m@xi
Hello Robert

Thanks to your reply I discovered the Stateful Functions which I believe is
a quite powerful tool. I have some questions:

1) As you said, "the community is currently in the process of releasing the
first Apache release of StateFun and it should hopefully be out by the end
of this week". Does this mean that it will become available in Maven
Repository? 

Because I can't find it while searching in
https://mvnrepository.com/artifact/org.apache.flink?sort=newest 
or 
use the API in my intellij project when I import the dependencies in my POM
file.

I though of dowloading the code from
https://ci.apache.org/projects/flink/flink-statefun-docs-master/, compiling
it with *mvn clean package* and then import the produced jar file to my
intellij project as an External Library. Is this what you might recommend
for now?

2) I came across this tutorial by Stefan on stateful functions
https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that arbitrary
communication between nodes/functions/actors is essentially made possible by
introducing feedback loops to the DAG Flink topology (well now it has
circles I guess :P) to simulate the arbitrary messasing defined in the
StateFun topology.

So the message passing is done by "feedback and tuple rerouting" and not
with MPI. Do you think (or have tested) if one may *efficiently*
send/receive (potentially large) state, like graph state which is the use
case of this post?  Or it is more suitable for sending control messages
between actors/functions?

Thanks a lot in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


回复: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 Thread Evan
 苟刚你好,刚才看了你的kafka消费代码,建议你在获取consumer后,增加一行如下代码 
“consumer.setStartFromLatest();”然后再测试一下。


/**
  * @param env
  * @param topic
  * @param time 订阅的时间
  * @return
  * @throws IllegalAccessException
  */
  public static DataStreamSource

回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-07 Thread 111
Hi,
已经提交到JIRA,https://issues.apache.org/jira/browse/FLINK-17022
Best,
Xinghalo

Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 Thread gang.gou
我的kafka版本是0.11;flink版本是1.6.3;我没有显示设置offset自动提交,但是看kafka的官网文档,默认应该是true




在 2020/4/7 
下午2:10,“Evan” 写入:

苟刚你好:请问你使用的kafka是什么版本的,还有就是有没有设置offset自动提交






--原始邮件--
发件人:"苟刚"

回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-07 Thread 111
Hi,
好的,没问题。


Best,
xinghalo


| |
xinghalo
|
|
xingh...@163.com
|
签名由网易邮箱大师定制


在2020年04月7日 14:28,Jark Wu 写道:
Hi Xinghalo,

看起来是个 codegen bug, 能帮忙在 JIRA 中建一个 issue 么?最好能附上您的例子。

Best,
Jark

On Tue, 7 Apr 2020 at 14:22, 111  wrote:

Hi,
原来如此,是搭配使用的。


不过加上条件后,提示无法进行剪枝
Caused by: org.apache.flink.table.api.TableException: This calc has no
useful projection and no filter. It should be removed by CalcRemoveRule. at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:176)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)必须强制加上一个where
1=1才能走通。select * from ( SELECT member_id, category_id, c, row_number() over
(PARTITION BY member_id ORDER BY c) AS rn FROM window_v where 1=1 ) where
rn <= 5
Best,Xinghalo


Re: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-07 Thread Jark Wu
Hi Xinghalo,

看起来是个 codegen bug, 能帮忙在 JIRA 中建一个 issue 么?最好能附上您的例子。

Best,
Jark

On Tue, 7 Apr 2020 at 14:22, 111  wrote:

> Hi,
> 原来如此,是搭配使用的。
>
>
> 不过加上条件后,提示无法进行剪枝
> Caused by: org.apache.flink.table.api.TableException: This calc has no
> useful projection and no filter. It should be removed by CalcRemoveRule. at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:176)
> at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)必须强制加上一个where
> 1=1才能走通。select * from ( SELECT member_id, category_id, c, row_number() over
> (PARTITION BY member_id ORDER BY c) AS rn FROM window_v where 1=1 ) where
> rn <= 5
> Best,Xinghalo


Re: flink 1.9 conflict jackson version

2020-04-07 Thread aj
Hi Fanbin,

I am facing a similar kind of issue. Let me know if you are able to
resolve this issue then please help me also

https://stackoverflow.com/questions/61012350/flink-reading-a-s3-file-causing-jackson-dependency-issue



On Tue, Dec 17, 2019 at 7:50 AM ouywl  wrote:

> Hi Bu
>I think It can use mvn-shade-plugin to resolve your problem,  It seem
> flink-client conflict with your owner jar?
>
> ouywl
> ou...@139.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 12/17/2019 08:10,Fanbin Bu
>  wrote:
>
> Hi,
>
> After I upgrade flink 1.9, I got the following error message on EMR, it
> works locally on IntelliJ.
>
> I'm explicitly declaring the dependency as
> implementation
> 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
> and I have
> implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version:
> '1.11.595'
>
>
>
> java.lang.NoSuchMethodError: 
> com.fasterxml.jackson.databind.ObjectMapper.enable([Lcom/fasterxml/jackson/core/JsonParser$Feature;)Lcom/fasterxml/jackson/databind/ObjectMapper;
>   at 
> com.amazonaws.partitions.PartitionsLoader.(PartitionsLoader.java:54)
>   at 
> com.amazonaws.regions.RegionMetadataFactory.create(RegionMetadataFactory.java:30)
>   at com.amazonaws.regions.RegionUtils.initialize(RegionUtils.java:65)
>   at 
> com.amazonaws.regions.RegionUtils.getRegionMetadata(RegionUtils.java:53)
>   at com.amazonaws.regions.RegionUtils.getRegion(RegionUtils.java:107)
>   at 
> com.amazonaws.client.builder.AwsClientBuilder.getRegionObject(AwsClientBuilder.java:256)
>   at 
> com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:460)
>   at 
> com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:424)
>   at 
> com.amazonaws.client.builder.AwsAsyncClientBuilder.build(AwsAsyncClientBuilder.java:80)
>   at 
> com.coinbase.util.KmsClient$.getSnowflakeUsernamePassword(KmsClient.scala:21)
>   at com.coinbase.ml.RunFlinkJob$.runBatch(RunFlinkJob.scala:94)
>   at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:38)
>   at 
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
>   at 
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-07 Thread 111
Hi,
原来如此,是搭配使用的。


不过加上条件后,提示无法进行剪枝
Caused by: org.apache.flink.table.api.TableException: This calc has no useful 
projection and no filter. It should be removed by CalcRemoveRule. at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:176)
 at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
 at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)必须强制加上一个where
 1=1才能走通。select * from ( SELECT member_id, category_id, c, row_number() over 
(PARTITION BY member_id ORDER BY c) AS rn FROM window_v where 1=1 ) where rn <= 
5
Best,Xinghalo

Re: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-07 Thread Jark Wu
Hi,

你的 topn 只做了排序,没做前 n 名的过滤,加上 where 条件再试下。

Select * from (
SELECT
member_id,
 category_id,
 c,
 row_number() over (PARTITION BY member_id ORDER BY c) AS rn
FROM window_vl
) where rn <= 20;

Best,
Jark

On Tue, 7 Apr 2020 at 14:01, 111  wrote:

> Hi,
>
>
> 这里不太理解,我是想取每个用户在某个窗口内的topn分类,正常在batch模式下的语法是:
> row_number() over (PARTITION BY member_id ORDER BY c) AS rn
> 得到的就是每个用户下category按照对应数量的排序结果。
>
>
> 如果我这里使用HOP_PROCTIME得到了time属性,
> row_number() over (PARTITION BY member_id ORDER BY time) AS rn
> 这样的结果貌似是每个用户按照滑动时间窗口排序,并不是每个滑动窗口下按照c来排序。
>
>
> Best,
> Xinghalo


关于Flink的Kafka client-id报错

2020-04-07 Thread 曲洋
你好,
我们使用Flink跑线上任务已经稳定运行两个多月了,一直没有什么问题,在没有任何改动的情况下突然TasManager开始挂掉,经过看日期发现,报了以下错误:
ERROR org.apache.kafka.common.metrics.Metrics   - Error 
when removing metric from org.apache.kafka.common.metrics.JmxReporter
org.apache.kafka.common.KafkaException: Error registering mbean 
kafka.consumer:type=consumer-node-metrics,client-id=consumer-2,node-id=node--2
at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:169)
at 
org.apache.kafka.common.metrics.JmxReporter.metricRemoval(JmxReporter.java:100)
at org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:534)
at org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:448)
at 
org.apache.kafka.common.network.Selector$SelectorMetrics.close(Selector.java:1204)
at org.apache.kafka.common.network.Selector.close(Selector.java:368)
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:627)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(Consume
目前这个问题仍旧存在,大概每个一两个小时会出来,然后伴随TaskManager挂掉,查了资料有的说框架本身的bug,想咨询下各位有没有遇到同样的问题

Re: 如何合并 binlog stream 和 table stream?

2020-04-07 Thread Jark Wu
Hi,

你这里的合并是用join 来做么? 这样的话,会比较耗性能。

一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 是幂等操作的,因为会有多处理一部分的
binlog,没法做到 精确地切换到 kafka offset 上。

另外你也可以参考下 StreamSQL 的 bootstrap 的做法:
https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar

Best,
Jark


On Sun, 5 Apr 2020 at 22:48, 刘宇宝  wrote:

> 大家好,
>
> 我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA 有一个
> topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
>
>
>   1.  先连接上 Kafka 开始消费 topic “mysql_server.test.tableA”,确保连接成功,记为
> binlog-stream,但是要暂停消费 Kafka;
>   2.  用 JDBCInputFormat 读取 test.tableA 到一个 DataStream 里,记为 table-stream;
>   3.  合并两个 streams,消费完 table-stream 后再开始消费 binlog-stream,这样可以确保 binlog 是
> *后*  应用到某个快照表上。
>
> 问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态
> startBinlog,初始值为 false:
>
>   binlog-stream -> waitOperator   ->   sinkOperator
>   table-stream -> notifyOperator -> sinkOperator
>
> 两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等
> table-stream 消费完(不知道怎么判断消费完了。。。),  notifyOperator 修改全局状态,这样 binlog-stream
> 就能被继续消费了。
>
> 但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
>
> 请教怎么破?
>
> 谢谢!
>
>


??????fink??????????????kafka????????????????????

2020-04-07 Thread Evan
??kafkaoffset






----
??:""

upgrade flink from 1.9.1 to 1.10.0 on EMR

2020-04-07 Thread aj
Hello All,

I am running Flink on AWS EMR, as currently the latest version available on
EMR is 1.9.1 but I want to upgrade to 1.10.0. I tried to manually replace
lib jars by downloading the 1.10.0 version but this is not working. I am
getting the following exception when trying to submit a job on yarn.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "main" java.lang.IllegalArgumentException: Invalid
rule: /L
  RULE:[2:$1@$0](.*@)s/@.*///L
  DEFAULT
at
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
at
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at
org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)
End of LogType:jobmanager.err


Please help to understand this error and how to resolve this.


-- 
Thanks & Regards,
Anuj Jain






回复: Flink SQL 1.10中ROW_NUMBER的使用

2020-04-07 Thread 111
Hi,


这里不太理解,我是想取每个用户在某个窗口内的topn分类,正常在batch模式下的语法是:
row_number() over (PARTITION BY member_id ORDER BY c) AS rn
得到的就是每个用户下category按照对应数量的排序结果。


如果我这里使用HOP_PROCTIME得到了time属性,
row_number() over (PARTITION BY member_id ORDER BY time) AS rn
这样的结果貌似是每个用户按照滑动时间窗口排序,并不是每个滑动窗口下按照c来排序。


Best,
Xinghalo