Re: where can get the summary changes between flink-1.0 and flink-0.10
I think Chiwan's estimate is accurate. Anything between 2 and 4 weeks is realistic in my opinion. We will make sure that the release comes with a migration/breaking changes guide, so you will have a smooth experience when upgrading. In the mean time, you can also work with the current master/snapshot version [1]. I'm curious, what are you trying to solve with the UID method? – Ufuk [1] http://flink.apache.org/contribute-code.html#snapshots-nightly-builds On Thu, Feb 18, 2016 at 2:17 AM, Chiwan Park wrote: > We’re testing a release candidate for 1.0 [1] currently. You can use new > features I’m not sure because I’m not in PMC of Flink but I think we can > release in a month. > > Regards, > Chiwan Park > > [1]: > http://mail-archives.apache.org/mod_mbox/flink-user/201602.mbox/%3CCAGr9p8AkiT0CT_YBwMhHCUYmoC9Stw%3DLZzkNs2iRLNJ5rLMzdA%40mail.gmail.com%3E > >> On Feb 17, 2016, at 5:08 PM, wangzhijiang999 >> wrote: >> >> Hi Chiwan, >> >> Thank you for instant reply, when will the official flink-1.0 be >> released, can you give a rough estimate? I am interested in the new feature >> of flink-1.0 like operator uid in order to solve my current problem. >> >> Regards, >> >> Zhijiang Wang >> >> -- >> 发件人:Chiwan Park >> 发送时间:2016年2月17日(星期三) 14:43 >> 收件人:user ,wangzhijiang999 >> 主 题:Re: where can get the summary changes between flink-1.0 and flink-0.10 >> >> Hi Zhijiang, >> >> We have wiki pages about description of Flink 1.0 relesase [1] [2]. But the >> pages are not updated in realtime. It is possible that there are some >> changes that haven’t been described. >> >> After releasing 1.0 officially, maybe we post an article dealing with the >> changes in 1.0 to the Flink blog [3]. >> >> Regards, >> Chiwan Park >> >> [1]: https://cwiki.apache.org/confluence/display/FLINK/1.0+Release >> [2]: >> https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version >> [3]: http://flink.apache.org/blog/ >> >> > On Feb 17, 2016, at 3:34 PM, wangzhijiang999 >> > wrote: >> > >> > Hi, >> > Where can get the summary changes between flink-1.0 and flink-0.10, >> > thank you in advance! >> > >> > >> > >> > >> > >> > Best Regards, >> > >> > Zhijiang Wang >> >
Re: Finding the average temperature
Hello Aljoscha Thanks very much for clarifying the role of Pre-Aggregation (rather, Incr-Aggregation, now that I understand the intention). It helps me to understand. Thanks to Setfano too, for keeping at the original question of mine. My current understanding is that if I have to compute the average of a streaming set of _temperatures_ then the *best* way to accomplish this, is by employing *one* node (or thread, on my laptop), losing speed but gaining deterministic behaviour in the process. I can decide to capture the average either by grouping the temperatures by count or by time. Because I am sliding the window anyway, I don't run the risk of accumulation of elements in the window and buffer overrun. Could you please confirm if my understanding is correct? I feel happy if I 'understand' the basis of a design well! :-) -- Nirmalya -- Software Technologist http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them."
Re: where can get the summary changes between flink-1.0 and flink-0.10
We’re testing a release candidate for 1.0 [1] currently. You can use new features I’m not sure because I’m not in PMC of Flink but I think we can release in a month. Regards, Chiwan Park [1]: http://mail-archives.apache.org/mod_mbox/flink-user/201602.mbox/%3CCAGr9p8AkiT0CT_YBwMhHCUYmoC9Stw%3DLZzkNs2iRLNJ5rLMzdA%40mail.gmail.com%3E > On Feb 17, 2016, at 5:08 PM, wangzhijiang999 > wrote: > > Hi Chiwan, > > Thank you for instant reply, when will the official flink-1.0 be > released, can you give a rough estimate? I am interested in the new feature > of flink-1.0 like operator uid in order to solve my current problem. > > Regards, > > Zhijiang Wang > > -- > 发件人:Chiwan Park > 发送时间:2016年2月17日(星期三) 14:43 > 收件人:user ,wangzhijiang999 > 主 题:Re: where can get the summary changes between flink-1.0 and flink-0.10 > > Hi Zhijiang, > > We have wiki pages about description of Flink 1.0 relesase [1] [2]. But the > pages are not updated in realtime. It is possible that there are some changes > that haven’t been described. > > After releasing 1.0 officially, maybe we post an article dealing with the > changes in 1.0 to the Flink blog [3]. > > Regards, > Chiwan Park > > [1]: https://cwiki.apache.org/confluence/display/FLINK/1.0+Release > [2]: > https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version > [3]: http://flink.apache.org/blog/ > > > On Feb 17, 2016, at 3:34 PM, wangzhijiang999 > > wrote: > > > > Hi, > > Where can get the summary changes between flink-1.0 and flink-0.10, > > thank you in advance! > > > > > > > > > > > > Best Regards, > > > > Zhijiang Wang >
Re: Availability for the ElasticSearch 2 streaming connector
I recently did exactly what Robert described: I copied the code from this (closed) PR https://github.com/apache/flink/pull/1479, modified it a bit, and just included it in my own project that uses the Elasticsearch 2 java api. Seems to work well. Here are the files so you can do the same: https://gist.github.com/zcox/59e486be7aeeca381be0 -Zach On Wed, Feb 17, 2016 at 4:06 PM Suneel Marthi wrote: > Hey I missed this thread, sorry about that. > > I have a basic connector working with ES 2.0 which I can push out. Its > not optimized yet and I don't have the time to look at it, if someone would > like to take it over go ahead I can send a PR. > > On Wed, Feb 17, 2016 at 4:57 PM, Robert Metzger > wrote: > >> Hi Mihail, >> >> It seems that nobody is actively working on the elasticsearch2 connector >> right now. The 1.0.0 release is already feature frozen, only bug fixes or >> (some) pending pull requests go in. >> >> What you can always do is copy the code from our current elasticsearch >> connector, set the dependency to the version you would like to use and >> adopt our code to their API changes. I think it might take not much time to >> get it working. >> (The reason why we usually need more time for stuff like this are >> integration tests and documentation). >> >> Please let me know if that solution doesn't work for you. >> >> Regards, >> Robert >> >> >> On Tue, Feb 16, 2016 at 2:53 PM, Vieru, Mihail >> wrote: >> >>> Hi, >>> >>> in reference to this ticket >>> https://issues.apache.org/jira/browse/FLINK-3115 when do you think that >>> an ElasticSearch 2 streaming connector will become available? Will it make >>> it for the 1.0 release? >>> >>> That would be great, as we are planning to use that particular version >>> of ElasticSearch in the very near future. >>> >>> Best regards, >>> Mihail >>> >> >> >
Re: Availability for the ElasticSearch 2 streaming connector
Hey I missed this thread, sorry about that. I have a basic connector working with ES 2.0 which I can push out. Its not optimized yet and I don't have the time to look at it, if someone would like to take it over go ahead I can send a PR. On Wed, Feb 17, 2016 at 4:57 PM, Robert Metzger wrote: > Hi Mihail, > > It seems that nobody is actively working on the elasticsearch2 connector > right now. The 1.0.0 release is already feature frozen, only bug fixes or > (some) pending pull requests go in. > > What you can always do is copy the code from our current elasticsearch > connector, set the dependency to the version you would like to use and > adopt our code to their API changes. I think it might take not much time to > get it working. > (The reason why we usually need more time for stuff like this are > integration tests and documentation). > > Please let me know if that solution doesn't work for you. > > Regards, > Robert > > > On Tue, Feb 16, 2016 at 2:53 PM, Vieru, Mihail > wrote: > >> Hi, >> >> in reference to this ticket >> https://issues.apache.org/jira/browse/FLINK-3115 when do you think that >> an ElasticSearch 2 streaming connector will become available? Will it make >> it for the 1.0 release? >> >> That would be great, as we are planning to use that particular version of >> ElasticSearch in the very near future. >> >> Best regards, >> Mihail >> > >
Re: Availability for the ElasticSearch 2 streaming connector
Hi Mihail, It seems that nobody is actively working on the elasticsearch2 connector right now. The 1.0.0 release is already feature frozen, only bug fixes or (some) pending pull requests go in. What you can always do is copy the code from our current elasticsearch connector, set the dependency to the version you would like to use and adopt our code to their API changes. I think it might take not much time to get it working. (The reason why we usually need more time for stuff like this are integration tests and documentation). Please let me know if that solution doesn't work for you. Regards, Robert On Tue, Feb 16, 2016 at 2:53 PM, Vieru, Mihail wrote: > Hi, > > in reference to this ticket > https://issues.apache.org/jira/browse/FLINK-3115 when do you think that > an ElasticSearch 2 streaming connector will become available? Will it make > it for the 1.0 release? > > That would be great, as we are planning to use that particular version of > ElasticSearch in the very near future. > > Best regards, > Mihail >
Re: Read once input data?
I'll be interested to hear more about this when you implement it. Thank you On Wed, Feb 17, 2016 at 4:44 AM, Flavio Pompermaier wrote: > In my use case I though to persist the dataset to reuse on Tachyon in > order to speed up its reading..do you think it could help? > > > On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake > wrote: > >> Thank you. I'll check this >> >> On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske wrote: >> >>> Broadcasted DataSets are stored on the JVM heap of each task manager >>> (but shared among multiple slots on the same TM), hence the size >>> restriction. >>> >>> There are two ways to retrieve a DataSet (such as the result of a >>> reduce). >>> 1) if you want to fetch the result into your client program use >>> DataSet.collect(). This immediately triggers an execution and fetches the >>> result from the cluster. >>> 2) if you want to use the result for a computation in the cluster use >>> broadcast sets as described above. >>> >>> 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake : >>> Thank you, yes, this makes sense. The broadcasted data in my case would a large array of 3D coordinates, On a side note, how can I take the output from a reduce function? I can see methods to write it to a given output, but is it possible to retrieve the reduced result back to the program - like a double value representing the average in the previous example. On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske wrote: > You can use so-called BroadcastSets to send any sufficiently small > DataSet (such as a computed average) to any other function and use it > there. > However, in your case you'll end up with a data flow that branches (at > the source) and merges again (when the average is send to the second map). > Such patterns can cause deadlocks and can therefore not be pipelined > which means that the data before the branch is written to disk and read > again. > In your case it might be even better to read the data twice instead of > reading, writing, and reading it. > > Fabian > > 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake : > >> I looked at the samples and I think what you meant is clear, but I >> didn't find a solution for my need. In my case, I want to use the result >> from first map operation before I can apply the second map on the >> *same* data set. For simplicity, let's say I've a bunch of short >> values represented as my data set. Then I need to find their average, so >> I >> use a map and reduce. Then I want to map these short values with another >> function, but it needs that average computed in the beginning to work >> correctly. >> >> Is this possible without doing multiple reads of the input data to >> create the same dataset? >> >> Thank you, >> saliya >> >> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske >> wrote: >> >>> Yes, if you implement both maps in a single job, data is read once. >>> >>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake : >>> Fabian, I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? Thanks, Saliya On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske wrote: > It is not possible to "pin" data sets in memory, yet. > However, you can stream the same data set through two different > mappers at the same time. > > For instance you can have a job like: > > /---> Map 1 --> SInk1 > Source --< > \---> Map 2 --> SInk2 > > and execute it at once. > For that you define you data flow and call execute once after all > sinks have been created. > > Best, Fabian > > 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake : > >> Fabian, >> >> count() was just an example. What I would like to do is say run >> two map operations on the dataset (ds). Each map will have it's own >> reduction, so is there a way to avoid creating two jobs for such >> scenario? >> >> The reason is, reading these binary matrices are expensive. In >> our current MPI implementation, I am using memory maps for faster >> loading >> and reuse. >> >> Thank you, >> Saliya >> >> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske > > wrote: >> >>> Hi, >>> >>> it looks like you are executing two distinct Flink jobs. >>> DataSet.count() triggers the execution of a new job. If you have >>> an execute() call in your
Re: streaming hdfs sub folders
I forgot to mention I'm using an AvroInputFormat to read the file (that might be relevant how the flag needs to be applied) See the code Snipped below: DataStream inStream = env.readFile(new AvroInputFormat(new Path(filePath), EndSongCleanedPq.class), filePath); On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann wrote: > The program is a DataStream program, it usually it gets the data from > kafka. It's an anomaly detection program that learns from the stream > itself. The reason I want to read from files is to test different settings > of the algorithm and compare them. > > I think I don't need to reply things in the exact order (wich is not > possible with parallel reads anyway) and I have written the program so it > can deal with out of order events. > I only need the subfolders to be processed roughly in order. Its fine to > process some stuff from 01 before everything from 00 is finished, if I get > records from all 24 subfolders at the same time things will break though. > If I set the flag will it try to get data from all sub dir's in parallel or > will it go sub dir by sub dir? > > Also can you point me to some documentation or something where I can see > how to set the Flag? > > cheers Martin > > > > > On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen wrote: > >> Hi! >> >> Going through nested folders is pretty simple, there is a flag on the >> FileInputFormat that makes sure those are read. >> >> Tricky is the part that all "00" files should be read before the "01" >> files. If you still want parallel reads, that means you need to sync at >> some point, wait for all parallel parts to finish with the "00" work before >> anyone may start with the "01" work. >> >> Is your training program a DataStream or a DataSet program?` >> >> Stephan >> >> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann wrote: >> >>> Hi, >>> >>> I have a streaming machine learning job that usually runs with input >>> from kafka. To tweak the models I need to run on some old data from HDFS. >>> >>> Unfortunately the data on HDFS is spread out over several subfolders. >>> Basically I have a datum with one subfolder for each hour within those are >>> the actual input files I'm interested in. >>> >>> Basically what I need is a source that goes through the subfolder in >>> order and streams the files into the program. I'm using event timestamps so >>> all files in 00 need to be processed before 01. >>> >>> Has anyone an idea on how to do this? >>> >>> cheers Martin >>> >>> >> >
Re: Behaviour of Windows on Shutdown
Haha :) It arrived, somehow my first mail got sent twice. On 17.02.2016 20:45, Aljoscha Krettek wrote: > Hi, > did the first mail from me not arrive? I’m sending it again: > > we changed it a while back to not emit any buffered elements at the end > because we noticed that that would be a more natural behavior. This must be > an oversight on our part. I’ll make sure that the 1.0 release will have the > correct behavior. > >> On 17 Feb 2016, at 16:18, Konstantin Knauf >> wrote: >> >> Hi everyone, >> >> if a DataStream is created with .fromElements(...) all windows emit all >> buffered records at the end of the stream. I have two questions about this: >> >> 1) Is this only the case for streams created with .fromElements() or >> does this happen in any streaming application on shutdown? >> >> 2) Is there a configuration option to disable this behaviour, such that >> buffered events remaining in windows are just discarded? >> >> In our application it is critical, that only events, which were >> explicitly fired are emitted from the windows. >> >> Cheers and thank you, >> >> Konstantin >> > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082
Re: Behaviour of Windows on Shutdown
Hi Ajoscha, thank you for the quick answer and sorry for the double-post. Cheers, Konstantin On 17.02.2016 19:20, Aljoscha Krettek wrote: > Hi, > we changed it a while back to not emit any buffered elements at the end > because we noticed that that would be a more natural behavior. This must be > an oversight on our part. I’ll make sure that the 1.0 release will have the > correct behavior. >> On 17 Feb 2016, at 16:35, Konstantin Knauf >> wrote: >> >> Hi everyone, >> >> if a DataStream is created with .fromElements(...) all windows emit all >> buffered records at the end of the stream. I have two questions about this: >> >> 1) Is this only the case for streams created with .fromElements() or >> does this happen in any streaming application on shutdown? >> >> 2) Is there a configuration option to disable this behaviour, such that >> buffered events remaining in windows are just discarded? >> >> In our application it is critical, that only events, which were >> explicitly fired are emitted from the windows. >> >> Cheers and thank you, >> >> Konstantin >> >> -- >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082
Re: Behaviour of Windows on Shutdown
Hi, did the first mail from me not arrive? I’m sending it again: we changed it a while back to not emit any buffered elements at the end because we noticed that that would be a more natural behavior. This must be an oversight on our part. I’ll make sure that the 1.0 release will have the correct behavior. > On 17 Feb 2016, at 16:18, Konstantin Knauf > wrote: > > Hi everyone, > > if a DataStream is created with .fromElements(...) all windows emit all > buffered records at the end of the stream. I have two questions about this: > > 1) Is this only the case for streams created with .fromElements() or > does this happen in any streaming application on shutdown? > > 2) Is there a configuration option to disable this behaviour, such that > buffered events remaining in windows are just discarded? > > In our application it is critical, that only events, which were > explicitly fired are emitted from the windows. > > Cheers and thank you, > > Konstantin >
回复:where can get the summary changes between flink-1.0 and flink-0.10
Hi Chiwan, Thank you for instant reply, when will the official flink-1.0 be released, can you give a rough estimate? I am interested in the new feature of flink-1.0 like operator uid in order to solve my current problem. Regards, Zhijiang Wang
Behaviour of Windows on Shutdown
Hi everyone, if a DataStream is created with .fromElements(...) all windows emit all buffered records at the end of the stream. I have two questions about this: 1) Is this only the case for streams created with .fromElements() or does this happen in any streaming application on shutdown? 2) Is there a configuration option to disable this behaviour, such that buffered events remaining in windows are just discarded? In our application it is critical, that only events, which were explicitly fired are emitted from the windows. Cheers and thank you, Konstantin
Re: Flink packaging makes life hard for SBT fat jar's
Hi! I know that Till is currently looking into making the SBT experience better. He should have an update in a bit. We need to check a few corner cases about how SBT and Maven dependencies and types (provided, etc) interact and come up with a plan. We'll also add an SBT quickstart to the homepage as a result of this, to help making this easier. Greetings, Stephan On Mon, Feb 15, 2016 at 3:41 PM, shikhar wrote: > Stephan Ewen wrote > > Do you know why you are getting conflicts on the FashHashMap class, even > > though the core Flink dependencies are "provided"? Does adding the Kafka > > connector pull in all the core Flink dependencies? > > Yes, the core Flink dependencies are being pulled in transitively from the > Kafka connector. > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-packaging-makes-life-hard-for-SBT-fat-jar-s-tp4897p4924.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: Dataset filter improvement
Hi Max, why do I need to register them? My job runs without problem also without that. The only problem with my POJOs was that I had to implement equals and hash correctly, Flink didn't enforce me to do it but then results were wrong :( On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels wrote: > Hi Flavio, > > Stephan was referring to > > env.registerType(ExtendedClass1.class); > env.registerType(ExtendedClass2.class); > > Cheers, > Max > > On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier > wrote: > > What do you mean exactly..? Probably I'm missing something here..remember > > that I can specify the right subClass only after the last flatMap, after > the > > first map neither me nor Flink can know the exact subclass of BaseClass > > > > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen wrote: > >> > >> Class hierarchies should definitely work, even if the base class has no > >> fields. > >> > >> They work more efficiently if you register the subclasses at the > execution > >> environment (Flink cannot infer them from the function signatures > because > >> the function signatures only contain the abstract base class). > >> > >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier > >> wrote: > >>> > >>> Because The classes are not related to each other. Do you think it's a > >>> good idea to have something like this? > >>> > >>> abstract class BaseClass(){ > >>>String someField; > >>> } > >>> > >>> class ExtendedClass1 extends BaseClass (){ > >>>String someOtherField11; > >>>String someOtherField12; > >>>String someOtherField13; > >>> ... > >>> } > >>> > >>> class ExtendedClass2 extends BaseClass (){ > >>>Integer someOtherField21; > >>>Double someOtherField22; > >>>Integer someOtherField23; > >>> ... > >>> } > >>> > >>> and then declare my map as Map. and then apply a > >>> flatMap that can be used to generated the specific datasets? > >>> Doesn't this cause problem to Flink? Classes can be vrry different to > >>> each other..maybe this can cause problems with the plan > generation..isn't > >>> it? > >>> > >>> Thanks Fabian and Stephan for the support! > >>> > >>> > >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen > wrote: > > Why not use an abstract base class and N subclasses? > > On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske > wrote: > > > > Unfortunately, there is no Either<1,...,n>. > > You could implement something like a Tuple3, > > Option, Option>. However, Flink does not provide an > Option > > type (comes with Java8). You would need to implement it yourself > incl. > > TypeInfo and Serializer. You can get some inspiration from the > Either type > > info /serializer, if you want to go this way. > > > > Using a byte array would also work but doesn't look much easier than > > the Option approach to me. > > > > 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier : > >> > >> Yes, the intermediate dataset I create then join again between > >> themselves. What I'd need is a Either<1,...,n>. Is that possible to > add? > >> Otherwise I was thinking to generate a Tuple2 and in > >> the subsequent filter+map/flatMap deserialize only those elements I > want to > >> group togheter (e.g. t.f0=="someEventType") in order to generate > the typed > >> dataset based. > >> Which one do you think is the best solution? > >> > >> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske > >> wrote: > >>> > >>> Hi Flavio, > >>> > >>> I did not completely understand which objects should go where, but > >>> here are some general guidelines: > >>> > >>> - early filtering is mostly a good idea (unless evaluating the > filter > >>> expression is very expensive) > >>> - you can use a flatMap function to combine a map and a filter > >>> - applying multiple functions on the same data set does not > >>> necessarily materialize the data set (in memory or on disk). In > most cases > >>> it prevents chaining, hence there is serialization overhead. In > some cases > >>> where the forked data streams are joined again, the data set must > be > >>> materialized in order to avoid deadlocks. > >>> - it is not possible to write a map that generates two different > >>> types, but you could implement a mapper that returns an > Either >>> Second> type. > >>> > >>> Hope this helps, > >>> Fabian > >>> > >>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier >: > > Any help on this? > > On 9 Feb 2016 18:03, "Flavio Pompermaier" > wrote: > > > > Hi to all, > > > > in my program I have a Dataset that generated different types of > > object wrt the incoming element. > > Thus it's like a Map. > > In order to type the different generated datasets I do something: > > > > Dataset start =... > > > >>
回复:where can get the summary changes between flink-1.0 and flink-0.10
Hi Chiwan, Thank you for instant reply, when will the official flink-1.0 be released, can you give a rough estimate? I am interested in the new feature of flink-1.0 like operator uid in order to solve my current problem. Regards, Zhijiang Wang
Re: 1.0-SNAPSHOT downloads
Thanks Max! I didn't notice that section of the docs. I've been using the RC0 and it's working well. -Zach On Mon, Feb 15, 2016 at 11:36 AM Maximilian Michels wrote: > Hi Zach, > > Here you go: > http://flink.apache.org/contribute-code.html#snapshots-nightly-builds > > Cheers, > Max > > On Mon, Feb 15, 2016 at 6:29 PM, Zach Cox wrote: > > Hi - are there binary downloads of the Flink 1.0-SNAPSHOT tarballs, like > > there are for 0.10.2 [1]? I'm testing out an application built against > the > > 1.0-SNAPSHOT dependencies from Maven central, and want to make sure I run > > them on a Flink 1.0-SNAPSHOT cluster that matches up with those jars. > > > > Thanks, > > Zach > > > > [1] http://flink.apache.org/downloads.html > > >
Re: streaming hdfs sub folders
The program is a DataStream program, it usually it gets the data from kafka. It's an anomaly detection program that learns from the stream itself. The reason I want to read from files is to test different settings of the algorithm and compare them. I think I don't need to reply things in the exact order (wich is not possible with parallel reads anyway) and I have written the program so it can deal with out of order events. I only need the subfolders to be processed roughly in order. Its fine to process some stuff from 01 before everything from 00 is finished, if I get records from all 24 subfolders at the same time things will break though. If I set the flag will it try to get data from all sub dir's in parallel or will it go sub dir by sub dir? Also can you point me to some documentation or something where I can see how to set the Flag? cheers Martin On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen wrote: > Hi! > > Going through nested folders is pretty simple, there is a flag on the > FileInputFormat that makes sure those are read. > > Tricky is the part that all "00" files should be read before the "01" > files. If you still want parallel reads, that means you need to sync at > some point, wait for all parallel parts to finish with the "00" work before > anyone may start with the "01" work. > > Is your training program a DataStream or a DataSet program?` > > Stephan > > On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann wrote: > >> Hi, >> >> I have a streaming machine learning job that usually runs with input from >> kafka. To tweak the models I need to run on some old data from HDFS. >> >> Unfortunately the data on HDFS is spread out over several subfolders. >> Basically I have a datum with one subfolder for each hour within those are >> the actual input files I'm interested in. >> >> Basically what I need is a source that goes through the subfolder in >> order and streams the files into the program. I'm using event timestamps so >> all files in 00 need to be processed before 01. >> >> Has anyone an idea on how to do this? >> >> cheers Martin >> >> >
Problem with Kafka 0.9 Client
Hi guys, We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not been able to retrieve data from our Kafka Cluster. The DEBUG data reports the following: 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test}, body={topics=[stream_test_3]}), isInitiatedByNetworkClient, createdTimeMs=1455702804364, sendTimeMs=0) to node 35 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 838 to Cluster(nodes = [Node(41, ip-.eu-west-1.compute.internal, 9092), Node(35, ip-.eu-west-1.compute.internal, 9092), Node(87, ip-.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic = stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr = [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35, replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3, partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,], Partition(topic = stream_test_3, partition = 3, leader = 35, replicas = [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]]) 10:53:24,398 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing group metadata request to broker 35 10:53:24,432 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1455702804432, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94, request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test}, body={group_id=test}), createdTimeMs=1455702804398, sendTimeMs=1455702804398), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) We receive this message all the time. What we don't know understand is this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}", as we see an error_code we suppose there was a problem. Our Kafka cluster works and we have some clients extracting data from it, so we don't know if this could be a Kafka issue or a Flink issue. Does anyone know, or understand, this response we are getting from Kafka? Thanks.
Re: IOException when trying flink-twitter example
Looks like an issue with the Twitter Client. Maybe the log reveals more that can help you figure out what is happening (loss of connection, etc). On Mon, Feb 15, 2016 at 1:32 PM, ram kumar wrote: > org.apache.flink.streaming.connectors.twitter.TwitterFilterSource - > Initializing Twitter Streaming API connection > 12:27:32,134 INFO > com.twitter.hbc.httpclient.BasicClient- New > connection executed: twitterSourceClient, endpoint: > /1.1/statuses/filter.json?delimited=length > 12:27:32,135 INFO > org.apache.flink.streaming.connectors.twitter.TwitterFilterSource - > Twitter Streaming API connection established successfully > 12:27:32,233 INFO > com.twitter.hbc.httpclient.ClientBase - > twitterSourceClient Establishing a connection > 12:27:32,296 WARN > com.twitter.hbc.httpclient.ClientBase - > twitterSourceClient IOException caught when establishing connection to > https://stream.twitter.com/1.1/statuses/filter.json?delimited=length > > When i run the job, at first the connection is established, > then this log appears > > 08:25:19,207 WARN > com.twitter.hbc.httpclient.ClientBase - > twitterSourceClient failed to establish connection properly > 08:25:19,207 INFO > com.twitter.hbc.httpclient.ClientBase - > twitterSourceClient Done processing, preparing to close connection > 08:25:19,208 INFO > com.twitter.hbc.httpclient.ClientBase - > twitterSourceClient Establishing a connection > 08:25:19,225 WARN > com.twitter.hbc.httpclient.ClientBase - > twitterSourceClient IOException caught when establishing connection to > https://stream.twitter.com/1.1/statuses/filter.json?delimited=length > > > On Thu, Feb 11, 2016 at 2:55 PM, Maximilian Michels > wrote: > >> Hi Ram, >> >> This is an Exception thrown in the Twitter client. Unfortunately the >> exception is caught and not printed to the screen. >> >> I would suggest to set a breakpoint in the ClientBase to find out >> about the reason for the failed connection. >> >> Cheers, >> Max >> >> On Thu, Feb 11, 2016 at 9:30 AM, ram kumar >> wrote: >> > Hi, >> > >> > Got following exception in taskmanager logs when running >> > TwitterFilterSourceExample, >> > >> > 08:25:19,207 WARN com.twitter.hbc.httpclient.ClientBase >> > - twitterSourceClient failed to establish connection properly >> > 08:25:19,207 INFO com.twitter.hbc.httpclient.ClientBase >> > - twitterSourceClient Done processing, preparing to close connection >> > 08:25:19,208 INFO com.twitter.hbc.httpclient.ClientBase >> > - twitterSourceClient Establishing a connection >> > 08:25:19,225 WARN com.twitter.hbc.httpclient.ClientBase >> > - twitterSourceClient IOException caught when establishing connection to >> > https://stream.twitter.com/1.1/statuses/filter.json?delimited=length >> > >> > >> > >> > Anyone faced this same issue? >> > >> > Thanks >> > >
Re: Behaviour of Windows on Shutdown
Hi, we changed it a while back to not emit any buffered elements at the end because we noticed that that would be a more natural behavior. This must be an oversight on our part. I’ll make sure that the 1.0 release will have the correct behavior. > On 17 Feb 2016, at 16:35, Konstantin Knauf > wrote: > > Hi everyone, > > if a DataStream is created with .fromElements(...) all windows emit all > buffered records at the end of the stream. I have two questions about this: > > 1) Is this only the case for streams created with .fromElements() or > does this happen in any streaming application on shutdown? > > 2) Is there a configuration option to disable this behaviour, such that > buffered events remaining in windows are just discarded? > > In our application it is critical, that only events, which were > explicitly fired are emitted from the windows. > > Cheers and thank you, > > Konstantin > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082
Re: Flink packaging makes life hard for SBT fat jar's
This seems to work to generate the assembly, hopefully not missing any required transitive deps: ``` "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.kafka" %% "kafka" % "0.8.2.2", ("org.apache.flink" %% "flink-connector-kafka-base" % flinkVersion).intransitive(), ("org.apache.flink" %% "flink-connector-kafka-0.8" % flinkVersion).intransitive(), ``` -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-packaging-makes-life-hard-for-SBT-fat-jar-s-tp4897p4964.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Finding the average temperature
Hi, the name pre-aggregation is a bit misleading. I have started calling it incremental aggregation because it does not work like a combiner. What it does is to incrementally fold (or reduce) elements as they arrive at the window operator. This reduces the amount of required space, because, otherwise, all the elements would have to be stored before the window is triggered. When using an incremental fold (or reduce) the WindowFunction only get’s the one final result of the incremental aggregation. Cheers, Aljoscha > On 17 Feb 2016, at 09:27, Stefano Baghino > wrote: > > Hi Nirmalaya, > > my reply was based on me misreading your original post, thinking you had a > batch of data, not a stream. I see that the apply method can also take a > reducer the pre-aggregates your data before passing it to the window > function. I suspect that pre-aggregation runs locally just like a combiner > would, but I'm really not sure about it. We should have more feedback on this > regard. > > On Tue, Feb 16, 2016 at 2:19 AM, Nirmalya Sengupta > wrote: > Hello Stefano > > Sorry for the late reply. Many thanks for taking effort to write and share an > example code snippet. > > I have been playing with the countWindow behaviour for some weeks now and I > am generally aware of the functionality of countWindowAll(). For my useCase, > where I _have to observe_ the entire stream as it founts in, using > countWindowAll() is probably the most obvious solution. This is what you > recommend too. However, because this is going to use 1 thread only (or 1 node > only in a cluster), I was thinking about ways to make use of the > 'distributedness' of the framework. Hence, my question. > > Your reply leads to me read and think a bit more. If I have to use > parallelism to achieve what I want to achieve, I think managing a ValueState > of my own is possibly the solution. If you have any other thoughts, please > share. > > From your earlier response: '... you can still enjoy a high level of > parallelism up until the last operator by using a combiner, which is > basically a reducer that operates locally ...'. Could you elaborate this a > bit, whenever you have time? > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is > where they should be. > Now put the foundation under them." > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit
Changing parallelism
Hi - we are building a stateful Flink streaming job that will run indefinitely. One part of the job builds up state per key in a global window that will need to exist for a very long time. We will definitely be using the savepoints to restore job state after new code deploys. We were planning to be able to increase the parallelism of the job incrementally over time, as the volume of input data grows. We also have a large amount of historical data loaded into Kafka we'd like to reprocess initially with the streaming job to backfill Elasticsearch, and then transition the job seamlessly to nearline processing. We were planning to use a large parallelism during the historical reprocessing, and then decrease it when the job has caught up to new events. However, the savepoint docs state that the job parallelism cannot be changed over time [1]. Does this mean we need to use the same, fixed parallelism=n during reprocessing and going forward? Are there any tricks or workarounds we could use to still make changes to parallelism and take advantage of savepoints? Thanks, Zach [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html#current-limitations
Behaviour of Windows on Shutdown
Hi everyone, if a DataStream is created with .fromElements(...) all windows emit all buffered records at the end of the stream. I have two questions about this: 1) Is this only the case for streams created with .fromElements() or does this happen in any streaming application on shutdown? 2) Is there a configuration option to disable this behaviour, such that buffered events remaining in windows are just discarded? In our application it is critical, that only events, which were explicitly fired are emitted from the windows. Cheers and thank you, Konstantin -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082
Re: Read once input data?
In my use case I though to persist the dataset to reuse on Tachyon in order to speed up its reading..do you think it could help? On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake wrote: > Thank you. I'll check this > > On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske wrote: > >> Broadcasted DataSets are stored on the JVM heap of each task manager (but >> shared among multiple slots on the same TM), hence the size restriction. >> >> There are two ways to retrieve a DataSet (such as the result of a reduce). >> 1) if you want to fetch the result into your client program use >> DataSet.collect(). This immediately triggers an execution and fetches the >> result from the cluster. >> 2) if you want to use the result for a computation in the cluster use >> broadcast sets as described above. >> >> 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake : >> >>> Thank you, yes, this makes sense. The broadcasted data in my case would >>> a large array of 3D coordinates, >>> >>> On a side note, how can I take the output from a reduce function? I can >>> see methods to write it to a given output, but is it possible to retrieve >>> the reduced result back to the program - like a double value representing >>> the average in the previous example. >>> >>> >>> On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske >>> wrote: >>> You can use so-called BroadcastSets to send any sufficiently small DataSet (such as a computed average) to any other function and use it there. However, in your case you'll end up with a data flow that branches (at the source) and merges again (when the average is send to the second map). Such patterns can cause deadlocks and can therefore not be pipelined which means that the data before the branch is written to disk and read again. In your case it might be even better to read the data twice instead of reading, writing, and reading it. Fabian 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake : > I looked at the samples and I think what you meant is clear, but I > didn't find a solution for my need. In my case, I want to use the result > from first map operation before I can apply the second map on the > *same* data set. For simplicity, let's say I've a bunch of short > values represented as my data set. Then I need to find their average, so I > use a map and reduce. Then I want to map these short values with another > function, but it needs that average computed in the beginning to work > correctly. > > Is this possible without doing multiple reads of the input data to > create the same dataset? > > Thank you, > saliya > > On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske > wrote: > >> Yes, if you implement both maps in a single job, data is read once. >> >> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake : >> >>> Fabian, >>> >>> I've a quick follow-up question on what you suggested. When >>> streaming the same data through different maps, were you implying that >>> everything goes as single job in Flink, so data read happens only once? >>> >>> Thanks, >>> Saliya >>> >>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske >>> wrote: >>> It is not possible to "pin" data sets in memory, yet. However, you can stream the same data set through two different mappers at the same time. For instance you can have a job like: /---> Map 1 --> SInk1 Source --< \---> Map 2 --> SInk2 and execute it at once. For that you define you data flow and call execute once after all sinks have been created. Best, Fabian 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake : > Fabian, > > count() was just an example. What I would like to do is say run > two map operations on the dataset (ds). Each map will have it's own > reduction, so is there a way to avoid creating two jobs for such > scenario? > > The reason is, reading these binary matrices are expensive. In our > current MPI implementation, I am using memory maps for faster loading > and > reuse. > > Thank you, > Saliya > > On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske > wrote: > >> Hi, >> >> it looks like you are executing two distinct Flink jobs. >> DataSet.count() triggers the execution of a new job. If you have >> an execute() call in your program, this will lead to two Flink jobs >> being >> executed. >> It is not possible to share state among these jobs. >> >> Maybe you should add a custom count implementation (using a >> ReduceFunction) which is executed in the same pr
Re: streaming hdfs sub folders
Hi! Going through nested folders is pretty simple, there is a flag on the FileInputFormat that makes sure those are read. Tricky is the part that all "00" files should be read before the "01" files. If you still want parallel reads, that means you need to sync at some point, wait for all parallel parts to finish with the "00" work before anyone may start with the "01" work. Is your training program a DataStream or a DataSet program?` Stephan On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann wrote: > Hi, > > I have a streaming machine learning job that usually runs with input from > kafka. To tweak the models I need to run on some old data from HDFS. > > Unfortunately the data on HDFS is spread out over several subfolders. > Basically I have a datum with one subfolder for each hour within those are > the actual input files I'm interested in. > > Basically what I need is a source that goes through the subfolder in order > and streams the files into the program. I'm using event timestamps so all > files in 00 need to be processed before 01. > > Has anyone an idea on how to do this? > > cheers Martin > >
Re: Problem with KeyedStream 1.0-SNAPSHOT
Hi Fabian, Thanks a lot, it worked. On 15 February 2016 at 12:42, Fabian Hueske wrote: > Hi Javier, > > Keys is an internal class and was recently moved to a different package. > So it appears like your Flink dependencies are not aligned to the same > version. > > We also added Scala version identifiers to all our dependencies which > depend on Scala 2.10. > For instance, flink-scala became flink-scala_2.10. > > Can you check if you need to update some of your dependencies? > See this wiki page [1] for a list of all changed dependencies. > If this is not the problem, I would try to update all Flink dependencies. > > Cheers, Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version > > 2016-02-15 10:54 GMT+01:00 Lopez, Javier : > >> Hi guys, >> >> I'm running a small test with the SNAPSHOT version in order to be able to >> use Kafka 0.9 and I'm getting the following error: >> >> *cannot access org.apache.flink.api.java.operators.Keys* >> *[ERROR] class file for org.apache.flink.api.java.operators.Keys not >> found* >> >> The code I'm using is as follows: >> >> *DataStream*<*String*> messageStream = env.addSource(new >> *FlinkKafkaConsumer09*<>("stream_test_6", new *SimpleStringSchema*(), >> properties)); >> >> *DataStream*<*Tuple2*<*String*, *Double*>> messageStreamObj = >> messageStream.map(new *MapFunction*<*String*, *Tuple2*<*String*, *Double*>>() >> { >> private static final long serialVersionUID = -6867736771747690202L; >> >> @Override >> public *Tuple2*<*String*, *Double*> map(*String *value) throws *Exception >> *{ >> *JSONParser *jsonParser = new *JSONParser*(); >> *JSONObject *jsonObject = (*JSONObject*) jsonParser.parse(value); >> *JSONObject *metaData = (*JSONObject*) jsonObject.get("metadata"); >> return new *Tuple2*<*String*, *Double*>((*String*)metaData.get("eid"), >> *Double*.parseDouble((*String*)jsonObject.get("item_price"))); >> } >> }); >> >> *KeyedStream*<*Tuple2*<*String*, *Double*>,?> keyStream = >> messageStreamObj.keyBy(0); >> >> Maven throws the error when trying to get the KeyedStream from the >> DataStream. I know that this class (operator.Keys) is depreciated but I >> don't know why it's been used by the function keyBy(int). >> >> Also, for reference, I'm using this >> version: 1.0-SNAPSHOT >> >> Do you have any idea why this happens? >> > >
Re: Finding the average temperature
Hi Nirmalaya, my reply was based on me misreading your original post, thinking you had a batch of data, not a stream. I see that the apply method can also take a reducer the pre-aggregates your data before passing it to the window function. I suspect that pre-aggregation runs locally just like a combiner would, but I'm really not sure about it. We should have more feedback on this regard. On Tue, Feb 16, 2016 at 2:19 AM, Nirmalya Sengupta < sengupta.nirma...@gmail.com> wrote: > Hello Stefano > > Sorry for the late reply. Many thanks for taking effort to write and share > an example code snippet. > > I have been playing with the countWindow behaviour for some weeks now and > I am generally aware of the functionality of countWindowAll(). For my > useCase, where I _have to observe_ the entire stream as it founts in, using > countWindowAll() is probably the most obvious solution. This is what you > recommend too. However, because this is going to use 1 thread only (or 1 > node only in a cluster), I was thinking about ways to make use of the > 'distributedness' of the framework. Hence, my question. > > Your reply leads to me read and think a bit more. If I have to use > parallelism to achieve what I want to achieve, I think managing a > ValueState of my own is possibly the solution. If you have any other > thoughts, please share. > > From your earlier response: '... you can still enjoy a high level of > parallelism up until the last operator by using a combiner, which is > basically a reducer that operates locally ...'. Could you elaborate this a > bit, whenever you have time? > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is > where they should be. > Now put the foundation under them." > -- BR, Stefano Baghino Software Engineer @ Radicalbit
Re: where can get the summary changes between flink-1.0 and flink-0.10
Yeah, we should definitely do a guide of changes between 0.10 and 1.0 On Wed, Feb 17, 2016 at 7:43 AM, Chiwan Park wrote: > Hi Zhijiang, > > We have wiki pages about description of Flink 1.0 relesase [1] [2]. But > the pages are not updated in realtime. It is possible that there are some > changes that haven’t been described. > > After releasing 1.0 officially, maybe we post an article dealing with the > changes in 1.0 to the Flink blog [3]. > > Regards, > Chiwan Park > > [1]: https://cwiki.apache.org/confluence/display/FLINK/1.0+Release > [2]: > https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version > [3]: http://flink.apache.org/blog/ > > > On Feb 17, 2016, at 3:34 PM, wangzhijiang999 > wrote: > > > > Hi, > > Where can get the summary changes between flink-1.0 and flink-0.10, > thank you in advance! > > > > > > > > > > > > Best Regards, > > > > Zhijiang Wang > >
Re: Regarding Concurrent Modification Exception
HI Biplob, Could you please supply some sample code? Otherwise it is tough to debug this problem. Cheers, Max On Tue, Feb 16, 2016 at 2:46 PM, Biplob Biswas wrote: > Hi, > > No, we don't start a flink job inside another job, although the job creation > was done in a loop, but only when one job is finished the next job started > after cleanup. And we didn't get this exception on my local flink > installation, it appears when i run on the cluster. > > Thanks & Regards > Biplob Biswas > > On Mon, Feb 15, 2016 at 12:25 PM, Fabian Hueske wrote: >> >> Hi, >> >> This stacktrace looks really suspicious. >> It includes classes from the submission client (CLIClient), optimizer >> (JobGraphGenerator), and runtime (KryoSerializer). >> >> Is it possible that you try to start a new Flink job inside another job? >> This would not work. >> >> Best, Fabian > >
Re: Dataset filter improvement
Hi Flavio, Stephan was referring to env.registerType(ExtendedClass1.class); env.registerType(ExtendedClass2.class); Cheers, Max On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier wrote: > What do you mean exactly..? Probably I'm missing something here..remember > that I can specify the right subClass only after the last flatMap, after the > first map neither me nor Flink can know the exact subclass of BaseClass > > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen wrote: >> >> Class hierarchies should definitely work, even if the base class has no >> fields. >> >> They work more efficiently if you register the subclasses at the execution >> environment (Flink cannot infer them from the function signatures because >> the function signatures only contain the abstract base class). >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier >> wrote: >>> >>> Because The classes are not related to each other. Do you think it's a >>> good idea to have something like this? >>> >>> abstract class BaseClass(){ >>>String someField; >>> } >>> >>> class ExtendedClass1 extends BaseClass (){ >>>String someOtherField11; >>>String someOtherField12; >>>String someOtherField13; >>> ... >>> } >>> >>> class ExtendedClass2 extends BaseClass (){ >>>Integer someOtherField21; >>>Double someOtherField22; >>>Integer someOtherField23; >>> ... >>> } >>> >>> and then declare my map as Map. and then apply a >>> flatMap that can be used to generated the specific datasets? >>> Doesn't this cause problem to Flink? Classes can be vrry different to >>> each other..maybe this can cause problems with the plan generation..isn't >>> it? >>> >>> Thanks Fabian and Stephan for the support! >>> >>> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen wrote: Why not use an abstract base class and N subclasses? On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske wrote: > > Unfortunately, there is no Either<1,...,n>. > You could implement something like a Tuple3, > Option, Option>. However, Flink does not provide an Option > type (comes with Java8). You would need to implement it yourself incl. > TypeInfo and Serializer. You can get some inspiration from the Either type > info /serializer, if you want to go this way. > > Using a byte array would also work but doesn't look much easier than > the Option approach to me. > > 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier : >> >> Yes, the intermediate dataset I create then join again between >> themselves. What I'd need is a Either<1,...,n>. Is that possible to add? >> Otherwise I was thinking to generate a Tuple2 and in >> the subsequent filter+map/flatMap deserialize only those elements I want >> to >> group togheter (e.g. t.f0=="someEventType") in order to generate the >> typed >> dataset based. >> Which one do you think is the best solution? >> >> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske >> wrote: >>> >>> Hi Flavio, >>> >>> I did not completely understand which objects should go where, but >>> here are some general guidelines: >>> >>> - early filtering is mostly a good idea (unless evaluating the filter >>> expression is very expensive) >>> - you can use a flatMap function to combine a map and a filter >>> - applying multiple functions on the same data set does not >>> necessarily materialize the data set (in memory or on disk). In most >>> cases >>> it prevents chaining, hence there is serialization overhead. In some >>> cases >>> where the forked data streams are joined again, the data set must be >>> materialized in order to avoid deadlocks. >>> - it is not possible to write a map that generates two different >>> types, but you could implement a mapper that returns an Either>> Second> type. >>> >>> Hope this helps, >>> Fabian >>> >>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier : Any help on this? On 9 Feb 2016 18:03, "Flavio Pompermaier" wrote: > > Hi to all, > > in my program I have a Dataset that generated different types of > object wrt the incoming element. > Thus it's like a Map. > In order to type the different generated datasets I do something: > > Dataset start =... > > Dataset ds1 = start.filter().map(..); > Dataset ds2 = start.filter().map(..); > Dataset ds3 = start.filter().map(..); > Dataset ds4 = start.filter().map(..); > > However this is very inefficient (I think because Flink needs to > materialize the entire source dataset for every slot). > > It's much more efficient to group the generation of objects of the > same type. E.g.: > > Dataset start =.. > > Dataset tmp1 = start.map(..);