Re: RocksDB / checkpoint questions

2018-02-05 Thread Stefan Richter
Hi, you are correct that RocksDB has a „working directory“ on local disk and checkpoints + savepoints go to a distributed filesystem. - if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of

Re:Re: Why does jobmanager running needs slot ?

2018-02-05 Thread mingleizhang
Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager sounds confuse to me. It might be better to rename it to requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. I just feel like offerSlotToJobManager sounds strange.. What do you think of this ?

Re:Re: Why does jobmanager running needs slot ?

2018-02-05 Thread mingleizhang
Makes sense to me now. Is it a new design at FLIP6 ? Rice. At 2018-02-05 17:49:05, "Piotr Nowojski" wrote: I might be wrong but I think it is other way around and the naming of this method is correct - it does exactly what it says. TaskManager comes with some

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
It seems so - but I’m saying this only basing on a annotations when this method was added (in the last couple of months). I’m not that much familiar with those code parts. Piotrek > On 5 Feb 2018, at 10:51, mingleizhang wrote: > > Makes sense to me now. Is it a new

Re: RocksDB / checkpoint questions

2018-02-05 Thread Christophe Jolif
Thanks a lot for the details Steffan. -- Christophe On Mon, Feb 5, 2018 at 11:31 AM, Stefan Richter wrote: > Hi, > > you are correct that RocksDB has a „working directory“ on local disk and > checkpoints + savepoints go to a distributed filesystem. > > - if I have

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
Yes, but this issue is still a part of the FLIP-6 work. Piotrek > On 5 Feb 2018, at 11:01, mingleizhang wrote: > > I found a website: https://issues.apache.org/jira/browse/FLINK-4360 > implemented this before. > >

Re: Kafka and parallelism

2018-02-05 Thread Christophe Jolif
Thanks. It helps indeed. I guess the last point it does not explicitly answer is "does just creating a kafka consumer reading from multiple partition set the parallelism to the number of partitions". But reading between the lines I think this answer is clearly no. You have to set your parallelism

Re: Task Manager detached under load

2018-02-05 Thread Till Rohrmann
Hi, this sounds like a serious regression wrt Flink 1.3.2 and we should definitely find out what's causing this problem. Given from what I see in the logs, the following happens: For some time the JobManager seems to no longer receive heartbeats from the TaskManager. This could be, for example,

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
I might be wrong but I think it is other way around and the naming of this method is correct - it does exactly what it says. TaskManager comes with some predefined task slots and it is the one that is offering them to a JobManager. JobManager can use those slots offers to (later!) schedule

Re:Re: Why does jobmanager running needs slot ?

2018-02-05 Thread mingleizhang
I found a website: https://issues.apache.org/jira/browse/FLINK-4360 implemented this before. Rice. At 2018-02-05 17:56:49, "Piotr Nowojski" wrote: It seems so - but I’m saying this only basing on a annotations when this method was added (in the last couple of

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-05 Thread Fabian Hueske
In case of a failure, Flink rolls back the job to the last checkpoint and reprocesses all data since that checkpoint. Also the BucketingSink will truncate a file to the position of the last checkpoint if the file system supports truncate. If not, it writes a file with the valid length and starts a

Fwd: Global window keyBy

2018-02-05 Thread miki haiat
yes . another question is how can i clear non trigger events after a period of time. is thire a way to configure some "timeout " thanks, allot . On Mon, Feb 5, 2018 at 10:40 AM, Piotr Nowojski wrote: > Hi, > > FIRE_AND_PURGE triggers

Rebalance to subtasks in same TaskManager instance

2018-02-05 Thread johannes.barn...@clarivate.com
Hi, I have a streaming topology with source parallelism of M and a target operator parallelism of N. For optimum performance I have found that I need to choose M and N independently. Also, the source subtasks do not all produce the same number of records and therefor I have to rebalance to the

Re: Kafka and parallelism

2018-02-05 Thread Tzu-Li (Gordon) Tai
Yes, the answer to that would be no. If you do not explicitly set a parallelism for the consumer, the parallelism by default will be whatever the parallelism of the job is, and is independent of how many Kafka partitions there are. Cheers, Gordon On 5 February 2018 at 11:42:21 AM, Christophe

Re: ML and Stream

2018-02-05 Thread Fabian Hueske
Hi Christophe, it is true that FlinkML only targets batch workloads. Also, there has not been any development since a long time. In March last year, a discussion was started on the dev mailing list about different machine learning features for stream processing [1]. One result of this discussion

Re: Flink not writing last few elements to disk

2018-02-05 Thread Piotr Nowojski
Hi, FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the stream will not end. Simple `writeAsCsv(…)` on the other hand only flushes the output file on a stream end (see `OutputFormatSinkFunction`). You can either use `PROCESS_ONCE` mode or use more advanced data sink: -

Re: Reduce parallelism without network transfer.

2018-02-05 Thread Piotr Nowojski
Hi, It should work like this out of the box if you use rescale method: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning

Re: Global window keyBy

2018-02-05 Thread Piotr Nowojski
Hi, FIRE_AND_PURGE triggers `org.apache.flink.api.common.state.State#clear()` call and it "Removes the value mapped under the current key.”. So other keys should remain unmodified. I hope this solves your problem/question? Piotrek > On 4 Feb 2018, at 15:39, miki haiat

Re: Kafka and parallelism

2018-02-05 Thread Tzu-Li (Gordon) Tai
Hi Christophe, You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern). The consumer deterministically assigns each partition to a single

Re: Getting Key from keyBy() in ProcessFunction

2018-02-05 Thread Piotr Nowojski
I think now it’s not easily possible, however it might be a valid suggestion to add `OnTimerContext#getCurrentKey()` method. Besides using ValueState as you discussed before, as a some kind of a walk around you could copy and modify KeyedProcessOperator to suits your needs, but this would be

Re: queryable state API

2018-02-05 Thread Fabian Hueske
Hi Maciek, AFAIK, there is some ongoing work to integrate queryable state with the new FLIP-6 mode. Maybe Kostas (in CC) who has worked on the queryable state API can help to answer your questions. Best, Fabian 2018-02-01 9:19 GMT+01:00 Maciek Próchniak : > Hello, > > Currently

Re: Reading bounded data from Kafka in Flink job

2018-02-05 Thread Fabian Hueske
Hi Hayden, as far as I know, an end offset is not supported by Flink's Kafka consumer. You could extend Flink's consumer. As you said, there is already code to set the starting offset (per partition), so you might be able to just piggyback on that. Gordon (in CC) who has worked a lot on the

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side of an RPC call that is being initiated on the sender side: org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager. In other words, JobMasterGateway.offerSlots is called by a TaskManager and it is a

Re: ML and Stream

2018-02-05 Thread Christophe Jolif
Fabian, Ok thanks for the update. Meanwhile I was looking at how I could still leverage current FlinkML API, but as far as I can see, it misses the ability of being able to persist its own models? So even for pure batch it prevents running your (once built) model in several jobs? Or am I missing

how to match external checkpoints with jobs during recovery

2018-02-05 Thread xiatao123
The external checkpoints are in the format of checkpoint_metadata-0057 which I have no idea which job this checkpoint metadata belongs to if I have multiple jobs running at the same time. If a job failed unexpected, I need to know which checkpoints belongs to the failed job. Is there API

Re: Reduce parallelism without network transfer.

2018-02-05 Thread Kien Truong
Thanks Piotr, it works. May I ask why default behavior when reducing parallelism is rebalance, and not rescale ? Regards, Kien ⁣Sent from TypeApp ​ On Feb 5, 2018, 15:28, at 15:28, Piotr Nowojski wrote: >Hi, > >It should work like this out of the box if you use

Re: Joining data in Streaming

2018-02-05 Thread Steven Wu
There is also a discussion of side input https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API I would load the smaller data set as static reference data set. Then you can just do single source streaming of the larger data set. On Wed, Jan 31, 2018 at 1:09 AM,

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-05 Thread xiaobin yan
Hi , You've got a point. I saw that method, but how can I make sure that all the subtasks checkpoint are finished, because I can only write _SUCCESS file at that time. Best, Ben > On 5 Feb 2018, at 6:34 PM, Fabian Hueske wrote: > > In case of a failure, Flink

Re: Task Manager detached under load

2018-02-05 Thread Ashish Pokharel
Hi Till, Thanks for detailed response. I will try to gather some of this information during the week and follow up. — Ashish > On Feb 5, 2018, at 5:55 AM, Till Rohrmann wrote: > > Hi, > > this sounds like a serious regression wrt Flink 1.3.2 and we should > definitely

Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Tony Wei
Hi all, I have defined a POJO class that override Object#hashCode and used it in keyBy(). The pipeline looks good (i.e. no exception that told me it is UNSUPPORTED key types), but I'm afraid that it will leads to a problem that elements that I think have the same key will not get the same state

Testing flink class loading

2018-02-05 Thread Data Engineer
I am trying to run the ClassLoaderTestProgram on flink. 1. I have started Flink in local mode with the following command: bin/jobmanager.sh start local 2. I ran the ClassLoaderTestProgram jar: bin/flink run ClassLoaderTestProgram.jar --resolve-first child --output out.txt I get a

Re: Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Tony Wei
Hi Timo, Thanks for your response. I will implement equals for my POJO directly. Is that be okay instead of wrap it into another class? Furthermore, I want to migrate the states from the previous job. Will it lead to state lost? I run my job on Flink 1.4.0. I used RocksDBStateBackend and only

Re: Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Timo Walther
Hi Tony, not having a proper equals() method might work for a keyBy() (partitioning operation) but it can lead to unexpected side effects when dealing with state. If not now, then maybe in the future. For example, heap-based state uses a hash table data structures such that your key might

Re: ML and Stream

2018-02-05 Thread Fabian Hueske
That's correct. It's not possible to persist data in memory across jobs in Flink's batch API. Best, Fabian 2018-02-05 18:28 GMT+01:00 Christophe Jolif : > Fabian, > > Ok thanks for the update. Meanwhile I was looking at how I could still > leverage current FlinkML API, but as