Re: Does Flink allows for encapsulation of transformations?

2016-06-09 Thread Ser Kho
Chesnay: I have two simple questions, related to the previous ones about encapsulation of transformations.  Question 1. I have tried to extend my code using your suggestions and come up with a small concern. First, your code: public static void main(String[] args) throws Exception {

Re: Data Source Generator emits 4 instances of the same tuple

2016-06-09 Thread Biplob Biswas
Yes Thanks a lot, also the fact that I was using ParallelSourceFunction was problematic. So as suggested by Fabian and Robert, I used Source Function and then in the flink job, i set the output of map with a parallelism of 4 to get the desired result. Thanks again. -- View this message in

Re: NotSerializableException

2016-06-09 Thread Stephan Ewen
You can also make the KeySelector a static inner class. That should work as well. On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh wrote: > Thank you Aljoscha and Fabian for your replies. > > @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm > afraid this

Re: Using Flink watermarks and a large window state for scheduling

2016-06-09 Thread Josh
Ok, thanks Aljoscha. As an alternative to using Flink to maintain the schedule state, I could take the (e, t2) stream and write to a external key-value store with a bucket for each minute. Then have a separate service which polls the key-value store every minute and retrieves the current bucket,

Join two streams using a count-based window

2016-06-09 Thread Nikos R. Katsipoulakis
Hello all, At first, I have a question posted on http://stackoverflow.com/questions/37732978/join-two-streams-using-a-count-based-window . I am re-posting this on the mailing list in case some of you are not on SO. In addition, I would like to know what is the difference between Flink and other

Re: NotSerializableException

2016-06-09 Thread Tarandeep Singh
Thank you Aljoscha and Fabian for your replies. @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm afraid this is a bug", I am assuming you are referring to Flink engine itself. @Fabian: thanks for the optimization tip. This is how I have got it working (with a hack): In my

Re: Maxby() and KeyBy() question

2016-06-09 Thread iñaki williams
Understood! I have created a WindowStream and now it is working. Thanks ! El jueves, 9 de junio de 2016, Fabian Hueske escribió: > Hi, > > you are computing a running aggregate, i.e., you're getting one output > record for each input record and the output record is the

Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
OK, this indicates that the operator following the source is a bottleneck. If that's the WindowOperator, it makes sense to try the refactoring of the WindowFunction. Alternatively, you can try to run that operator with a higher parallelism. 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <

Re: FlinkKafkaProducer API

2016-06-09 Thread Fabian Hueske
Great, thank you! 2016-06-09 17:38 GMT+02:00 Elias Levy : > > On Thu, Jun 9, 2016 at 5:16 AM, Fabian Hueske wrote: > >> thanks for your feedback. I think those are good observations and >> suggestions to improve the Kafka producers. >> The best

Re: HBase reads and back pressure

2016-06-09 Thread Christophe Salperwyck
Hi Fabian, Thanks for the help, I will try that. The backpressure was on the source (HBase). Christophe 2016-06-09 16:38 GMT+02:00 Fabian Hueske : > Hi Christophe, > > where does the backpressure appear? In front of the sink operator or > before the window operator? > > In

Re: FlinkKafkaProducer API

2016-06-09 Thread Elias Levy
On Thu, Jun 9, 2016 at 5:16 AM, Fabian Hueske wrote: > thanks for your feedback. I think those are good observations and > suggestions to improve the Kafka producers. > The best place to discuss such improvements is the dev mailing list. > > Would like to repost your mail

Re: Data Source Generator emits 4 instances of the same tuple

2016-06-09 Thread Fabian Hueske
We solved this problem yesterday at the Flink Hackathon. The issue was that the source function was started with parallelism 4 and each function read the whole file. Cheers, Fabian 2016-06-06 16:53 GMT+02:00 Biplob Biswas : > Hi, > > I tried streaming the source data 2

Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Fabian Hueske
Hi, 1) Yes, that is correct. If you set the parallelism of an operator to 1 it is only executed on a single node. It depends on your application, if you need a global state or whether multiple local states are OK. 2) Flink programs follow the concept a data flow. There is no communication between

Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
Hi Christophe, where does the backpressure appear? In front of the sink operator or before the window operator? In any case, I think you can improve your WindowFunction if you convert parts of it into a FoldFunction. The FoldFunction would take care of the statistics

Re: Maxby() and KeyBy() question

2016-06-09 Thread Fabian Hueske
Hi, you are computing a running aggregate, i.e., you're getting one output record for each input record and the output record is the record with the largest value observed so far. If the record with the largest value is the first, the record is sent out another time. This is what happened with

Re: NotSerializableException

2016-06-09 Thread Aljoscha Krettek
Hi, the problem is that the KeySelector is an anonymous inner class and as such as a reference to the outer RecordFilterer object. Normally, this would be rectified by the closure cleaner but the cleaner is not used in CoGroup.where(). I'm afraid this is a bug. Best, Aljoscha On Thu, 9 Jun 2016

Re: Strange behavior of DataStream.countWindow

2016-06-09 Thread Fabian Hueske
Hi Yukun, the problem is that the KeySelector is internally invoked multiple times. Hence it must be deterministic, i.e., it must extract the same key for the same object if invoked multiple times. The documentation is not discussing this aspect and should be extended. Thanks for pointing out

Re: Hourly top-k statistics of DataStream

2016-06-09 Thread Philippe Caparroy
You should have a look at this project : https://github.com/addthis/stream-lib You can use it within Flink, storing intermediate values in a local state. > Le 9 juin 2016 à 15:29, Yukun Guo a écrit : > > Thank you very much for the detailed answer. Now I understand a

Re: Hourly top-k statistics of DataStream

2016-06-09 Thread Christophe Salperwyck
Hi, There are some implementations to do that with low memory footprint. Have a look at the count min sketch for example. There are some Java implementations. Christophe 2016-06-09 15:29 GMT+02:00 Yukun Guo : > Thank you very much for the detailed answer. Now I understand a

Re: Using Flink watermarks and a large window state for scheduling

2016-06-09 Thread Aljoscha Krettek
Hi Josh, I'll have to think a bit about that one. Once I have something I'll get back to you. Best, Aljoscha On Wed, 8 Jun 2016 at 21:47 Josh wrote: > This is just a question about a potential use case for Flink: > > I have a Flink job which receives tuples with an event id

Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Ravikumar Hawaldar
Hi Fabian, Thank you for your answers, 1) If there is only single instance of that function, then it will defeat the purpose of distributed correct me if I am wrong, so If I run parallelism with 1 on cluster does that mean it will execute on only one node? 2) I mean to say, when a map operator

HBase reads and back pressure

2016-06-09 Thread Christophe Salperwyck
Hi, I am writing a program to read timeseries from HBase and do some daily aggregations (Flink streaming). For now I am just computing some average so not very consuming but my HBase read get slower and slower (I have few billions of points to read). The back pressure is almost all the time close

Re: Flink Dashboard

2016-06-09 Thread leon_mclare
Hi Till, thanks for the clarification. It all makes sense now. So the keyBy call is more a partitioning scheme and less of an operator, similar to Storm's field grouping, and Flink's other schemes such as forward and broadcast. The difference is that it produces KeyedStreams, which are a

Maxby() and KeyBy() question

2016-06-09 Thread iñaki williams
Hi again! I am working with two DataStreams, I want to get the maximun value from each pair of them, for example: //Informacion (matchName, LocalOdd, AwayOdd) Informacion info1= new Informacion("Match1", 1.10, 3.22); Informacion info2= new Informacion("Match2", 2.11, 1.10);

Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Fabian Hueske
Hi Ravikumar, I'll try to answer your questions: 1) If you set the parallelism of a map function to 1, there will be only a single instance of that function regardless whether it is execution locally or remotely in a cluster. 2) Flink does also support aggregations, (reduce, groupReduce, combine,

Re: FlinkKafkaProducer API

2016-06-09 Thread Fabian Hueske
Hi Elias, thanks for your feedback. I think those are good observations and suggestions to improve the Kafka producers. The best place to discuss such improvements is the dev mailing list. Would like to repost your mail there or open JIRAs where the discussion about these changes can continue?

Re: NotSerializableException

2016-06-09 Thread Fabian Hueske
Hi Tarandeep, the exception suggests that Flink tries to serialize RecordsFilterer as a user function (this happens via Java Serialization). I said suggests because the code that uses RecordsFilterer is not included. To me it looks like RecordsFilterer should not be used as a user function. It

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

2016-06-09 Thread Till Rohrmann
Hi Ahmed, I tried setting up your use case and for me it all seems to work. However, I didn't use the Spring framework and executed the program in a local Flink cluster. Maybe you can compile a self-containing example (including example data) to reproduce your problem and send it to us. Cheers,

Strange behavior of DataStream.countWindow

2016-06-09 Thread Yukun Guo
I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a DataStream consisting of 1000 Strings of random digits, which is windowed with a tumbling count window of 50 elements: import org.apache.flink.api.common.functions.FlatMapFunction;import

Re: Extracting Timestamp in MapFunction

2016-06-09 Thread Biplob Biswas
Hi Aljoscha, I went to the Flink hackathon by Buzzwords yesterday where Fabian and Robert helped me with this issue. Apparently I was assuming that the file would be handled in a single thread but I was using parallelsourcefunction and it was creating 4 different threads and thus reading the same

Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Ravikumar Hawaldar
Hi Till, Thank you for your answer, I have couple of questions 1) Setting parallelism on a single map function in local is fine but on distributed will it work as local execution? 2) Is there any other way apart from setting parallelism? Like spark aggregate function? 3) Is it necessary that

Re: Scala case classes with a generic parameter

2016-06-09 Thread Aljoscha Krettek
Hi James, the TypeInformation must be available at the call site, not in the case class definition. In your WindowFunction you are using a TestGen[String] so it should suffice to add this line at some point before the call to apply(): implicit val testGenType =

Re: Extracting Timestamp in MapFunction

2016-06-09 Thread Aljoscha Krettek
Hi, could you try pulling the problem apart, i.e. determine at which point in the pipeline you have duplicate data. Is it after the sources or in the CoFlatMap or the Map after the reduce, for example? Cheers, Aljoscha On Wed, 1 Jun 2016 at 17:11 Biplob Biswas wrote:

Re: ClassCastException when redeploying Flink job on running cluster

2016-06-09 Thread Till Rohrmann
Great to hear :-) On Wed, Jun 8, 2016 at 7:45 PM, Josh wrote: > Thanks Till, your suggestion worked! > > I actually just created a new SpecificData for each > AvroDeserializationSchema instance, so I think it's still just as efficient. > > Josh > > On Wed, Jun 8, 2016 at 4:41