Re: RocksDB state checkpointing is expensive?

2016-04-07 Thread Krzysztof Zarzycki
OK, Thanks Aljoscha for the info! Guys, great work on Flink, I really love it :) Cheers, Krzysztof czw., 7.04.2016 o 10:48 użytkownik Aljoscha Krettek napisał: > Hi, > you are right. Currently there is no incremental checkpointing and > therefore, at each checkpoint, we essentially copy the who

Re: FromIteratorFunction problems

2016-04-07 Thread Chesnay Schepler
hmm, maybe i was to quick with linking to the JIRA. As for an example: you can look at the streaming WindowJoin example. The sample data uses an Iterator. (ThrottledIterator) Note that the iterator implementation used is part of flink and also implements serializable. On 07.04.2016 22:18, And

Re: FromIteratorFunction problems

2016-04-07 Thread Chesnay Schepler
you will find some information regarding this issue in this JIRA: https://issues.apache.org/jira/browse/FLINK-2608 On 07.04.2016 22:18, Andrew Whitaker wrote: Hi, I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got: ``` StreamExecutionEnvir

Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
The exception does not show up in the console when I run the job, it only shows in the logs. I thought it means that it happens either on AM or TM (I assume what I see in stdout is client log). Is my thinking right? On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi wrote: > Hey Timur, > > Just had a

FromIteratorFunction problems

2016-04-07 Thread Andrew Whitaker
Hi, I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List list = Arrays.asList(1, 2); env.fromCollection(list.iterator(), Integer.class).print(); ```

Re: mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Bart van Deenen
Thanks all! I was under the mistaken impression that Flink automagically did the snapshotting for me. The info is really clear, I'll have no trouble implementing it. Bart On Thu, Apr 7, 2016, at 18:40, Aljoscha Krettek wrote: > Hi, > good explanation and pointers! > > I just want to add that

Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Ufuk Celebi
Hey Timur, Just had a chat with Robert about this. I agree that the error message is confusing, but it is fine it this case. The file system classes are not on the class path of the client process, which is submitting the job. It fails to sample the input file sizes, but this is just an optimizati

Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
There's one more filesystem integration failure that I have found. My job on a toy dataset succeeds, but Flink log contains the following message: 2016-04-07 18:10:01,339 ERROR org.apache.flink.api.common.io.DelimitedInputFormat - Unexpected problen while getting the file statistics for f

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
Thank you, Till! setting the flag in flink-conf.yalm worked, I'm very glad that it was resolved. Note, however, passing it as an argument to flink script did not work. I tried to pass it as: `-yDenv.java.opts="-Djava.library.path="`. I did not investigate any further at this time. Thanks, Timur O

Re: mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Aljoscha Krettek
Hi, good explanation and pointers! I just want to add that the uriLookup table in your example is not really shared between your operator instances in a distributed setting. When serializing your transformations the current state of the HashMap is serialized with them because it is in the closure

Re: WindowedStream operation questions

2016-04-07 Thread Aljoscha Krettek
Hi, I saw the issue you opened. :-) I'll try and figure out how to get all the Scaladocs on there. Regarding the other questions. A WindowedStream is basically not a Stream in itself but a stepping stone towards specifying a windowed operation that results in a new Stream. So the pattern always ha

Re: mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Stefano Baghino
Hi Bart, to make sure that the state is checkpointed you have to: 1. configure your Flink installation with a reliable state backend (optional for development, you can read more about it here

WindowedStream operation questions

2016-04-07 Thread Elias Levy
An observation and a couple of question from a novice. The observation: The Flink web site makes available ScalaDocs for org.apache.flink.api.scala but not for org.apache.flink.streaming.api.scala. Now for the questions: Why can't you use map to transform a data stream, say convert all the eleme

Re: Example - Reading Avro Generic records

2016-04-07 Thread Sourigna Phetsarath
Tranadeep, Also, in your code example, when *reuseAvroValue* is *false* the code will fail with this message: java.lang.RuntimeException: The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The class is no proper class, it is either abstract, an interface, or a primitive type.

Re: Example - Reading Avro Generic records

2016-04-07 Thread Sourigna Phetsarath
Tranadeep, Thanks for pasting your code! I have a PR ready that extends AvroInputFormat and will submit it soon. Still waiting for the legal team at AOL to approve it. -Gna On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh wrote: > Thank you Gna for opening the ticket. > > I looked into AvroIn

Re: Kafka state backend?

2016-04-07 Thread Zach Cox
Ah I didn't see that, thanks for the link! Glad this is being discussed. On Thu, Apr 7, 2016 at 5:06 AM Aljoscha Krettek wrote: > Hi Zach, > I'm afraid someone already beat you to it :-) > https://issues.apache.org/jira/browse/FLINK-3692 > > In the issue we touch on some of the difficulties with

mutable hashmap outside of stream, does it get snapshotted ?

2016-04-07 Thread Bart van Deenen
Hi all I'm having a datastream transformation, that updates a mutable hashmap that exists outside of the stream. So it's something like object FlinkJob { val uriLookup = mutable.HashMap.empty[String, Int] def main(args: Array[String]) { val stream: DataStream = ... stream.keybBy

Limit buffer size for a job

2016-04-07 Thread Andrew Ge Wu
Hi guys We have a prioritized queue, where high priority item can jump the queue and we do not want to cache too much record in the buffer. Is there a way to configure my streaming source to use less buffer? so source always fetch and get latest high prio records? Any suggestion? thanks! Andr

Re: Handling large state (incremental snapshot?)

2016-04-07 Thread Shannon Carey
HyperLogLog is worth a mention, but only if you don't mind some inaccuracy. On 4/7/16, 8:41 AM, "Hironori Ogibayashi" wrote: >I tried RocksDB, but the result was almost the same. > >I used the following code and put 2.6million distinct records into Kafka. >After processing all records, the sta

Re: Find differences

2016-04-07 Thread Fabian Hueske
I would go with an outer join as Stefano suggested. Outer joins can be executed as hash joins which will probably be more efficient than using a sort based groupBy/reduceGroup. Also outer joins are a more intuitive and simpler, IMO. 2016-04-07 12:35 GMT+02:00 Stefano Baghino : > Perhaps an outer

Re: State in external db (dynamodb)

2016-04-07 Thread Aljoscha Krettek
Hi, regarding windows and incremental aggregation. This is already happening in Flink as of now. When you give a ReduceFunction on a window, which "sum" internally does, the result for a window is incrementally updated whenever a new element comes in. This incremental aggregation only happens when

Re: State in external db (dynamodb)

2016-04-07 Thread Shannon Carey
Thanks very kindly for your response, Stephan! We will definitely use a custom sink for persistence of idempotent mutations whenever possible. Exposing state as read-only to external systems is a complication we will try to avoid. Also, we will definitely only write to the DB upon checkpoint, a

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread norman sp
Hi, here is an example input that produces the error. These are read from Kafka. 01:43:43.5921 2121{"Pressure target - Value":"6"} 01:43:43.5961 2121{"Flow target - Value":"23"} 01:43:44.2631 2121{"Pressure target - Value":"7"} 01:43:44.9721 2121

Re: threads, parallelism and task managers

2016-04-07 Thread Flavio Pompermaier
We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351. The first time you run the progr

Re: Find differences

2016-04-07 Thread Stefano Baghino
Perhaps an outer join can do the trick as well but I don't know which one would perform better. On Thu, Apr 7, 2016 at 12:05 PM, Lydia Ickler wrote: > Nevermind! I figured it out with groupby and > Reducegroup > > Von meinem iPhone gesendet > > > Am 07.04.2016 um 11:51 schrieb Lydia Ickler : >

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread Till Rohrmann
Hi Norman, could you provide me an example input data set which produces the error? E.g. the list of strings you inserted into Kafka/read from Kafka? Cheers, Till On Thu, Apr 7, 2016 at 11:05 AM, norman sp wrote: > Hi Till, > thank you. here's the code: > > public class CepStorzSimulator { > >

Re: Kafka state backend?

2016-04-07 Thread Aljoscha Krettek
Hi Zach, I'm afraid someone already beat you to it :-) https://issues.apache.org/jira/browse/FLINK-3692 In the issue we touch on some of the difficulties with this that stem from the differences in the guarantees that Flink and Samza try to give. Cheers, Aljoscha On Tue, 5 Apr 2016 at 22:24 Zach

Re: Find differences

2016-04-07 Thread Lydia Ickler
Nevermind! I figured it out with groupby and Reducegroup Von meinem iPhone gesendet > Am 07.04.2016 um 11:51 schrieb Lydia Ickler : > > Hi, > > If i have 2 DataSets A and B of Type Tuple3 how would > I get a subset of A (based on the fields (0,1)) that does not occur in B? > Is there maybe an

Re: Multiple operations on a WindowedStream

2016-04-07 Thread Aljoscha Krettek
Hi, the code seems alright? Did you try looking at the Flink Dashboard to check out whether any of the operations are sending elements? Cheers, Aljoscha On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala wrote: > This worked when I ran my test code locally, but I'm seeing nothing reach > my sink whe

Find differences

2016-04-07 Thread Lydia Ickler
Hi, If i have 2 DataSets A and B of Type Tuple3 how would I get a subset of A (based on the fields (0,1)) that does not occur in B? Is there maybe an already implemented method? Best regards, Lydia Von meinem iPhone gesendet

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Till Rohrmann
For passing the dynamic property directly when running things on YARN, you have to use -yDenv.java.opts="..." ​ On Thu, Apr 7, 2016 at 11:42 AM, Till Rohrmann wrote: > Hi Timur, > > what you can try doing is to pass the JVM parameter > -Djava.library.path= via the env.java.opts to the system. Yo

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Till Rohrmann
Hi Timur, what you can try doing is to pass the JVM parameter -Djava.library.path= via the env.java.opts to the system. You simply have to add env.java.opts: "-Djava.library.path=" in the flink-conf.yaml or via -Denv.java.opts="-Djava.library.path=", if I’m not mistaken. Cheers Till ​ On Thu, Ap

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread norman sp
Hi Till, thank you. here's the code: public class CepStorzSimulator { public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); if(parameter

Re: YARN High Availability

2016-04-07 Thread Robert Metzger
Cool. I'm happy that you were able to validate the issue. I'll write a fix for it: https://issues.apache.org/jira/browse/FLINK-3712 On Thu, Apr 7, 2016 at 10:43 AM, Konstantin Knauf < konstantin.kn...@tngtech.com> wrote: > Hi everyone, > > thanks to Robert, I found the problem. > > I was setting

Re: Flink CEP AbstractCEPPatternOperator fail after event detection

2016-04-07 Thread Till Rohrmann
Hi Norman, this error is exactly what I thought I had fixed. I guess there is still another case where a premature pruning can happen in the SharedBuffer. Could you maybe send me the example code with which you could produce the error. The input data would also be very helpful. Then I can debug it

Re: RocksDB state checkpointing is expensive?

2016-04-07 Thread Aljoscha Krettek
Hi, you are right. Currently there is no incremental checkpointing and therefore, at each checkpoint, we essentially copy the whole RocksDB database to HDFS (or whatever filesystem you chose as a backup location). As far as I know, Stephan will start working on adding support for incremental snapsh

Re: Handling large state (incremental snapshot?)

2016-04-07 Thread Aljoscha Krettek
Ah yes, you're right. With the non-keyed stream it doesn't make a big difference because it's only one big state value. The throughput still seems quite low. Have you ever tried looking at the "back pressure" tab on the Flink dashboard. For this I would suggest to disable chaining, so that every o

Re: YARN High Availability

2016-04-07 Thread Konstantin Knauf
Hi everyone, thanks to Robert, I found the problem. I was setting "recovery.zookeeper.path.root" on the command line with -yD. Apparently this is currently not supported. You need to set it the parameter in flink-conf.yaml. Cheers, Konstantin On 05.04.2016 12:52, Konstantin Knauf wrote: > Hi R

Re: Accessing RDF triples using Flink

2016-04-07 Thread Flavio Pompermaier
Hi Ritesh, Jena could store triples in NQuadsInputFormat that is an HadoopInputFormat so that you can read data in effiient way with Flink. Unfortunately I rembember that I had some problem usign it so I just export my Jena model as NQuads so then I can parse it efficiently with Flink as a text fil

Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
there is a hack for this issue: copying my native library to $HADOOP_HOME/lib/native makes it discoverable and a program runs, however this is not an appropriate solution and it seems to be fragile. I tried to find where 'lib/native' path appears in the configuration and found 2 places: hadoop-env

Flink event processing immediate feedback

2016-04-07 Thread igor.berman
Hi, Suppose I have web facing frontend that gets stream of events(http calls). I need to process event stream and do some aggregations over those events and write aggregated statistics to Hbase - so far Flink seems as perfect match. However in some cases event should trigger some alert and frontend

RocksDB state checkpointing is expensive?

2016-04-07 Thread Krzysztof Zarzycki
Hi, I saw the documentation and source code of the state management with RocksDB and before I use it, I'm concerned of one thing: Am I right that currently when state is being checkpointed, the whole RocksDB state is snapshotted? There is no incremental, diff snapshotting, is it? If so, this seems