Re: Iterating over state entries

2018-02-19 Thread Ken Krugler
Hi Till, > On Feb 19, 2018, at 8:14 AM, Till Rohrmann wrote: > > Hi Ken, > > just for my clarification, the `RocksDBMapState#entries` method does not > satisfy your requirements? This method does not allow you to iterate across > different keys of your keyed stream of

Re: Iterating over state entries

2018-02-19 Thread Ken Krugler
Hi Fabian, > I'd like to clarify what I said before. > > By using MapState mainly gain two things: > - position access by index > - the full list does not need to be deserialized to read values (which is how > ListState works). > > Point access should obviously done by get(index). > However,

Re: A "per operator instance" window all ?

2018-02-19 Thread Xingcan Cui
Hi Julien, you could use the OperatorState to cache the data in a window and the last time your window fired. Then you check the ctx.timerService().currentProcessingTime() in

How to find correct "imports"

2018-02-19 Thread Esa Heikkinen
Hi I am quite new with Flink and Scala. I have had a bit of trouble finding corrects "imports". What would be the best way to find them ? For example the imports for StreamTableEnvironment and CsvTableSource. And how do I know if I should put something pom.xml ? Esa

Re: Retrieve written records of a sink after job

2018-02-19 Thread Fabian Hueske
Hi Flavio, Not sure if I would add this functionality to the sinks. You could also add a MapFunction with a counting Accumulator right before each sink. Best, Fabian 2018-02-14 14:11 GMT+01:00 Flavio Pompermaier : > So, if I'm not wrong, the right way to do this is using

Re: Only a single message processed

2018-02-19 Thread Niclas Hedhman
(Sorry for the incoherent order and ramblings. I am writing this as I am trying to sort out what is going on...) 1. It is the first message to be processed in the Kafka topic. If I set the offset manually, it will pick up the message at that point, process it, and ignore all following messages.

Re: CoProcess() VS union.Process() & Timers in them

2018-02-19 Thread Fabian Hueske
Changing the parallelism works in Flink by taking a savepoint, shutting down the job, and restarting it from the savepoint with another parallelism. The rescale() operator defines how records are exchanged between two operators with different parallelism. Rescale prefers local data exchange over

Re: Manipulating Processing elements of Network Buffers

2018-02-19 Thread Till Rohrmann
Hi Max, the network buffer order is quite important at the moment, because the network stream does not only transport data but also control events such as the checkpoint barriers. In order to guarantee that you don't lose data in case of a failure it is (at the moment) strictly necessary that

Re: Need to understand the execution model of the Flink

2018-02-19 Thread Fabian Hueske
Hi, this works as follows. - Table API and SQL queries are translated into regular DataSet jobs (assuming you are running in a batch ExecutionEnvironment). - A query is translated into a sequence of DataSet operators when you 1) transform the Table into a DataSet or 2) write it to a TableSink.

Re: A "per operator instance" window all ?

2018-02-19 Thread Julien
Hello, I've already tried to key my stream with "resourceId.hashCode%parallelism" (with parallelism of 4 in my example). So all my keys will be either 0,1, 2 or 3. I can then benefit from a time window on this keyed stream and do only 4 queries to my external system. But it is not well

Re: Retrieve written records of a sink after job

2018-02-19 Thread Flavio Pompermaier
Hi Fabian, thanks for the feedback. Right now I'm doing exactly as you said. Since I was seeing this as a useful API extension I just proposed this addition and so I asked for feedbacks. Of course, it doesn't make much sense if I'm the only one asking for it :) Best, Flavio On Mon, Feb 19, 2018

Re: Architecture question

2018-02-19 Thread Fabian Hueske
Hi, What you are looking for is a BucketingSink that works on event time (the timestamp is encoded in your data). AFAIK, Flink's BucketingSink has been designed to work in processing time, but you can implement a Bucketer that creates buckets based on a timestamp in the data. You might need to

Re: Only a single message processed

2018-02-19 Thread Fabian Hueske
Hi Niclas, Glad that you got it working! Thanks for sharing the problem and solution. Best, Fabian 2018-02-19 9:29 GMT+01:00 Niclas Hedhman : > > (Sorry for the incoherent order and ramblings. I am writing this as I am > trying to sort out what is going on...) > > 1. It is

Discarding bad data in Stream

2018-02-19 Thread Niclas Hedhman
Hi again, something that I don't find (easily) in the documentation is what the recommended method is to discard data from the stream. On one hand, I could always use flatMap(), even if it is "per message" since that allows me to return zero or one objects. DataStream stream =

Re: How to find correct "imports"

2018-02-19 Thread Niclas Hedhman
It is called "declared dependencies", and Flink has a huge number of artifacts, and they have also changed name over time. But Maven Central provides a search facility. Try http://search.maven.org/#search%7Cga%7C5%7Cg%3Aorg. apache.flink%20AND%20v%3A1.4.0 And it will give you all artifacts from

Re: How to find correct "imports"

2018-02-19 Thread Timo Walther
Hi Esa, the easiest and recommended way is: - Create your Flink project with the provided quickstart scripts [1] - Visit the documentation about a feature you want to use. E.g. for the Table & SQL API [2] Usually it is described which modules you need. I hope this helps. Regards, Timo

Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-19 Thread Bart Kastermans
In https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html it is shown that for gracefully stopping a job you need to implement the StoppableFunction interface. This appears not (yet) implemented for Kafka consumers. Am I missing something, or is there a different way to

Re: Need to understand the execution model of the Flink

2018-02-19 Thread Darshan Singh
Thanks Fabian for such detailed explanation. I am using a datset in between so i guess csv is read once. Now to my real issue i have 6 task managers each having 4 cores and i have 2 slots per task manager. Now my csv file is jus 1 gb and i create table and transform to dataset and then run 15

Re: Correlation between number of operators and Job manager memory requirements

2018-02-19 Thread Shailesh Jain
Actually, there are too many hyperparameters to experiment with, that is why I'm trying to understand if there is any particular way in which a cluster could be benchmarked. Another strange behaviour I am observing is: Delaying the operator creation (by distributing the operators across jobs, and

Re: Discarding bad data in Stream

2018-02-19 Thread Fabian Hueske
Hi Niclas, I'd either add a Filter to directly discard bad records. That should make the behavior explicit. If you need to do complex transformations that you don't want to do twice, the FlatMap approach would be the most efficient. If you'd like to keep the bad records, you can implement a

Managed State Custom Serializer with Avro

2018-02-19 Thread Niels
Hi all, I'm currently trying to use Avro in order to evolve our data present in Flink's Managed State. I've extended the TypeSerializer class successfully for this purpose, but still have issues using Schema Evolution. *The problem:* When we try to read data (deserialize from savepoint) with a

Re: Need to understand the execution model of the Flink

2018-02-19 Thread Fabian Hueske
Hi, that's a difficult question without knowing the details of your job. A NoSpaceLeftOnDevice error occurs when a file system is full. This can happen if: - A Flink algorithm writes to disk, e.g., an external sort or the hash table of a hybrid hash join. This can happen for GroupBy, Join,

Re: [ANNOUNCE] Apache Flink 1.4.1 released

2018-02-19 Thread Stephan Ewen
Great, thanks a lot for being the release manager, Gordon! On Fri, Feb 16, 2018 at 12:54 AM, Hao Sun wrote: > This is great! > > On Thu, Feb 15, 2018 at 2:50 PM Bowen Li wrote: > >> Congratulations everyone! >> >> On Thu, Feb 15, 2018 at 10:04 AM, Tzu-Li

Re: Unexpected hop start & end timestamps after stream SQL join

2018-02-19 Thread Fabian Hueske
Hi Juho, sorry for the late response. I found time to look into this issue. I agree, that the start and end timestamps of the HOP window should be 1 hour apart from each other. I tried to reproduce the issue, but was not able to do so. Can you maybe open a JIRA and provide a simple test case

Window with recent messages

2018-02-19 Thread Krzysztof Białek
Hi, My app is calculating Companies scores from Ratings given by users. Only ratings from last 90 days should be considered. 1. Is it possible to construct window processing ratings from last 90 days? I've started with *misusing* countWindow but this solution looks ugly for me. ratingStream

Re: Question about the need of consumer groups from kafka

2018-02-19 Thread Till Rohrmann
Hi Ricardo, could you please give a bit more details what you mean with "not using its own mechanism"? Flink's Kafka connector uses the Kafka consumer and producer (to some extent) API to talk to Kafka. The consumer groups are a central concept of Kafka and as such, the Flink Kafka connector has

Re: Discarding bad data in Stream

2018-02-19 Thread Niclas Hedhman
Thanks Fabian, I have seen Side Outputs and OutputTags but not fully understood the mechanics yet. In my case, I don't need to keep bad records... And I think I will end up with flatMap() after all, it just becomes a internal documentation issue to provide relevant information... Thanks for your

Re: Iterating over state entries

2018-02-19 Thread Fabian Hueske
Hi Ken, I'd like to clarify what I said before. By using MapState mainly gain two things: - position access by index - the full list does not need to be deserialized to read values (which is how ListState works). Point access should obviously done by get(index). However, iterating over the list

Re: Iterating over state entries

2018-02-19 Thread Till Rohrmann
Hi Ken, just for my clarification, the `RocksDBMapState#entries` method does not satisfy your requirements? This method does not allow you to iterate across different keys of your keyed stream of course. But it should allow you to iterate over the different entries for a given key of your keyed

Re: Concurrent modification Exception when submitting multiple jobs

2018-02-19 Thread Till Rohrmann
Hi Vinay, could you try to create a dedicated RemoteEnvironment for each parallel thread. I think that the StreamExecutionEnvironment is not thread safe and should, thus, not be shared across multiple threads if that's the case. Getting a glimpse at your code would also help to further

Re: Regarding Task Slots allocation

2018-02-19 Thread Till Rohrmann
Hi Vinay, try to set the parallelism to 2 for the job you are executing via the RemoteExecutionEnvironment. Where have you specified the number of TaskManager slots? In the flink-conf.yaml file which you used to deploy the remote Flink cluster? Cheers, Till On Fri, Feb 16, 2018 at 7:14 PM,

Re: A "per operator instance" window all ?

2018-02-19 Thread Ken Krugler
Hi Julien, I'd run into a similar situation, where I need to have a keyed stream, but I want (effectively) one key per task. It’s possible to generate keys that will get distributed as you need, though it does require making assumptions about how Flink generates hashes/key groups. And once

Re: A "per operator instance" window all ?

2018-02-19 Thread Till Rohrmann
Hi Julien, at the moment Flink only supports parallel windows which are keyed. What you would need is something like a per-partition window which is currently not supported. The problem with that is that it is not clear how to rescale a per-partition window because it effectively means that you

Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-19 Thread Till Rohrmann
Hi Bart, you're right that Flink currently does not support a graceful stop mechanism for the Kafka source. The community has already a good idea how to solve it in the general case and will hopefully soon add it to Flink. Concerning the StoppableFunction: This interface was introduced quite

Re: Regarding BucketingSink

2018-02-19 Thread Till Rohrmann
Hi Vishal, what pending files should indeed get eventually finalized. This happens on a checkpoint complete notification. Thus, what you report seems not right. Maybe Aljoscha can shed a bit more light into the problem. In order to further debug the problem, it would be really helpful to get

Re: Regarding BucketingSink

2018-02-19 Thread Aljoscha Krettek
Hi, Sorry for the confusion. The framework (Flink) does currently not do any cleanup of pending files, yes. Best, Aljoscha > On 19. Feb 2018, at 17:01, Vishal Santoshi wrote: > > >> You should only have these dangling pending files after a failure-recovery > >>

sink with BucketingSink to S3 files override

2018-02-19 Thread galantaa
Hey, I have some kind of a concurrency problem with Bucketing sink when I write to S3. I use the AvroKeyValueSinkWriter. The problem is that when I send events the suppose to be written to the same directory, but to a different part file (because of different event types), the files override each

Re: Managed State Custom Serializer with Avro

2018-02-19 Thread Till Rohrmann
Hi Niels, which version of Flink are you using? Currently, Flink does not support to upgrade the TypeSerializer itself, if I'm not mistaken. As you've described, it will try to use the old serializer stored in the checkpoint stream to restore state. I've pulled Gordon into the conversation who

Re: Managed State Custom Serializer with Avro

2018-02-19 Thread Niels Denissen
Hi Till, Thanks for the quick reply, I'm using 1.3.2 atm. Cheers, Niels On Feb 19, 2018 19:10, "Till Rohrmann" wrote: > Hi Niels, > > which version of Flink are you using? Currently, Flink does not support to > upgrade the TypeSerializer itself, if I'm not mistaken. As

Re: Need to understand the execution model of the Flink

2018-02-19 Thread Darshan Singh
Thanks , is there a metric or other way to know how much space each task/job is taking? Does execution plan has these details? Thanks On Mon, Feb 19, 2018 at 10:54 AM, Fabian Hueske wrote: > Hi, > > that's a difficult question without knowing the details of your job. > A

Re: Regarding BucketingSink

2018-02-19 Thread Aljoscha Krettek
Hi, The BucketingSink does not clean up pending files on purpose. In a distributed setting, and especially with rescaling of Flink operators, it is sufficiently hard to figure out which of the pending files you actually can delete and which of them you have to leave because they will get moved

Re: Regarding BucketingSink

2018-02-19 Thread Vishal Santoshi
>> You should only have these dangling pending files after a failure-recovery cycle, as you noticed. My suggestion would be to periodically clean up older pending files. A little confused. Is that what the framework should do, or us as part of some cleanup job ? On Mon, Feb 19, 2018 at 10:47

Re: Correlation between number of operators and Job manager memory requirements

2018-02-19 Thread Till Rohrmann
Hi Shailesh, my question would be where do you see the OOM happening? Does it happen on the JM or the TM. The memory requirements for each operator strongly depend on the operator and it is hard to give a general formula for that. It mostly depends on the user function. Flink itself should not

Re: Regarding BucketingSink

2018-02-19 Thread Vishal Santoshi
That is fine, till flink assure at-least-once semantics ? If the contents of a .pending file, through the turbulence ( restarts etc ) are assured to be in another file than anything starting with "_" underscore will by default ignored by hadoop ( hive or MR etc ). On Mon, Feb 19, 2018 at