Re: Periodic full stream aggregations
Hey, The current code supports 2 types of aggregations, simple binary reduce: T,T=>T and also the grouped version for this, where the reduce function is applied per a user defined key (so there we keep a map of reduced values). This can already be used to implement fairly complex logic if we transform the data to a proper type before passing it to the reducer. As a next step we can make this work with fold + combiners as well, where your initial data type is T and your fould function is T,R => R and a combiner is R,R => R. At that point I think any sensible aggregation can be implemented. Regards, Gyula On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna < cado...@informatik.hu-berlin.de> wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA1 > > Hi Gyula, > > fair enough! > > I used a bad example. > > What I really wanted to know is whether your code supports only > aggregation like sum, min, and max where you need to pass only a value > to the next aggregation or also more complex data structures, e.g., a > synopsis of the full stream, to compute an aggregation such as an > approximate count distinct (item count)? > > Cheers, > Bruno > > On 21.04.2015 15:18, Gyula Fóra wrote: > > You are right, but you should never try to compute full stream > > median, thats the point :D > > > > On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna < > > cado...@informatik.hu-berlin.de> wrote: > > > > Hi Gyula, > > > > I read your comments of your PR. > > > > I have a question to this comment: > > > > "It only allows aggregations so we dont need to keep the full > > history in a buffer." > > > > What if the user implements an aggregation function like a median? > > > > For a median you need the full history, don't you? > > > > Am I missing something? > > > > Cheers, Bruno > > > > On 21.04.2015 14:31, Gyula Fóra wrote: > I have opened a PR for this feature: > > https://github.com/apache/flink/pull/614 > > Cheers, Gyula > > On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra > wrote: > > > Thats a good idea, I will modify my PR to that :) > > > > Gyula > > > > On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske > > wrote: > > > >> Is it possible to switch the order of the statements, > >> i.e., > >> > >> dataStream.every(Time.of(4,sec)).reduce(...) instead of > >> dataStream.reduce(...).every(Time.of(4,sec)) > >> > >> I think that would be more consistent with the structure > >> of the remaining API. > >> > >> Cheers, Fabian > >> > >> 2015-04-21 10:57 GMT+02:00 Gyula Fóra > >> : > >> > >>> Hi Bruno, > >>> > >>> Of course you can do that as well. (That's the good > >>> part :p ) > >>> > >>> I will open a PR soon with the proposed changes (first > >>> without breaking > >> the > >>> current Api) and I will post it here. > >>> > >>> Cheers, Gyula > >>> > >>> On Tuesday, April 21, 2015, Bruno Cadonna < > >> cado...@informatik.hu-berlin.de > > >>> wrote: > >>> > Hi Gyula, > > I have a question regarding your suggestion. > > Can the current continuous aggregation be also specified with > your proposed periodic aggregation? > > I am thinking about something like > > dataStream.reduce(...).every(Count.of(1)) > > Cheers, Bruno > > On 20.04.2015 22:32, Gyula Fóra wrote: > >> Hey all, > >> > >> I think we are missing a quite useful feature > >> that could be implemented (with some slight > >> modifications) on top of the current windowing > >> api. > >> > >> We currently provide 2 ways of aggregating (or > >> reducing) over streams: doing a continuous > >> aggregation and always output the aggregated > >> value (which cannot be done properly in parallel) > >> or doing aggregation in a window periodically. > >> > >> What we don't have at the moment is periodic > >> aggregations on the whole stream. I would even go > >> as far as to remove the continuous outputting > >> reduce/aggregate it and replace it with this > >> version as this in return can be done properly in > >> parallel. > >> > >> My suggestion would be that a call: > >> > >> dataStream.reduce(..) dataStream.sum(..) > >> > >> would return a windowed data stream where the > >> window is the whole record history, and the user > >> would need to define a trigger to get the actual > >> reduced values like: > >> > >> dataStream.reduce(...).every(Time.of(4,sec)) to > >> get the actual reduced results. > >> dataStream.sum(...).every(...) > >> > >> I think the current data stream > >> reduce/aggregation is very confusing without > >> being pra
Re: Periodic full stream aggregations
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, fair enough! I used a bad example. What I really wanted to know is whether your code supports only aggregation like sum, min, and max where you need to pass only a value to the next aggregation or also more complex data structures, e.g., a synopsis of the full stream, to compute an aggregation such as an approximate count distinct (item count)? Cheers, Bruno On 21.04.2015 15:18, Gyula Fóra wrote: > You are right, but you should never try to compute full stream > median, thats the point :D > > On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna < > cado...@informatik.hu-berlin.de> wrote: > > Hi Gyula, > > I read your comments of your PR. > > I have a question to this comment: > > "It only allows aggregations so we dont need to keep the full > history in a buffer." > > What if the user implements an aggregation function like a median? > > For a median you need the full history, don't you? > > Am I missing something? > > Cheers, Bruno > > On 21.04.2015 14:31, Gyula Fóra wrote: I have opened a PR for this feature: https://github.com/apache/flink/pull/614 Cheers, Gyula On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra wrote: > Thats a good idea, I will modify my PR to that :) > > Gyula > > On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske > wrote: > >> Is it possible to switch the order of the statements, >> i.e., >> >> dataStream.every(Time.of(4,sec)).reduce(...) instead of >> dataStream.reduce(...).every(Time.of(4,sec)) >> >> I think that would be more consistent with the structure >> of the remaining API. >> >> Cheers, Fabian >> >> 2015-04-21 10:57 GMT+02:00 Gyula Fóra >> : >> >>> Hi Bruno, >>> >>> Of course you can do that as well. (That's the good >>> part :p ) >>> >>> I will open a PR soon with the proposed changes (first >>> without breaking >> the >>> current Api) and I will post it here. >>> >>> Cheers, Gyula >>> >>> On Tuesday, April 21, 2015, Bruno Cadonna < >> cado...@informatik.hu-berlin.de >>> wrote: >>> Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: >> Hey all, >> >> I think we are missing a quite useful feature >> that could be implemented (with some slight >> modifications) on top of the current windowing >> api. >> >> We currently provide 2 ways of aggregating (or >> reducing) over streams: doing a continuous >> aggregation and always output the aggregated >> value (which cannot be done properly in parallel) >> or doing aggregation in a window periodically. >> >> What we don't have at the moment is periodic >> aggregations on the whole stream. I would even go >> as far as to remove the continuous outputting >> reduce/aggregate it and replace it with this >> version as this in return can be done properly in >> parallel. >> >> My suggestion would be that a call: >> >> dataStream.reduce(..) dataStream.sum(..) >> >> would return a windowed data stream where the >> window is the whole record history, and the user >> would need to define a trigger to get the actual >> reduced values like: >> >> dataStream.reduce(...).every(Time.of(4,sec)) to >> get the actual reduced results. >> dataStream.sum(...).every(...) >> >> I think the current data stream >> reduce/aggregation is very confusing without >> being practical for any normal use-case. >> >> Also this would be a very api breaking change >> (but I would still make this change as it is much >> more intuitive than the current behaviour) so I >> would try to push it before the release if we can >> agree. >> >> Cheers, Gyula >> >>> >> > > > >> > - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1 iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8 IWJoqT17EetTw82brOfy+kLC
Re: Fault Tolerance for Flink Iterations
Hi Markus! I see your point. My first guess would be that it would be simpler to do this logic in the driver program, rather than inside the JobManager. If the checkpoints are all written and the job fails, you check what was the latest completed checkpoint (by file) and then start the program again with the source that refers to those files. That way, you go through the proper stack (optimizer and jobgraph generator) that inserts all the necessary partition and sort operations. Greetings, Stephan On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer < holzemer.mar...@googlemail.com> wrote: > Hi everybody, > > I am writing my master thesis about making flink iterations / iterative > flink algorithms fault tolerant. > The first approach I implemented is a basic checkpointing, where every N > iterations the current state is saved into HDFS. > To do this I enabled data sinks inside of iterations, then attached a new > checkpointing sink to the beginning of each iteration. To recover from a > previous checkpoint I cancel all tasks, add a new datasource in front of > the iteration and reschedule the tasks with lower dop. I do this out of the > JobManager during runtime without starting a new job. > The problem is that sometimes the input data to the iteration has some > properties like a certain partitioning or sorting, and I am struggeling > with reconstructing theses properties from the checkpoint source. > I figured that an easier way to do this is to re-optimize the new plan > (with the new source as input to the iteration) before the rescheduling. > But in the current project structure flink-runtime has no access to > flink-optimizer and it would be a major design break to change this. > Has somebody any advice on this? > > best, > Markus >
Re: About Operator and OperatorBase
Thanks for the explanation, Stephan. I always wonder why the extra common APIs exist. Then I think this should be high priority if we want to remove the common API to reduce the unnecessary layer and "dead code". As Ufuk mentioned before, better doing it now before more stuff build on top of Flink. So removing old Record API [1] and the tests depending on them is step one of the process, but what is JSON API? - Henry [1] https://issues.apache.org/jira/browse/FLINK-1681 On Tue, Apr 21, 2015 at 1:10 AM, Stephan Ewen wrote: > Originally, we had multiple Apis with different data models: the current > Java API, the record api, a JSON API. The common API was the data model > agnostic set of operators on which they built. > > It has become redundant when we saw how well things can be built in top of > the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all > build on top of the Java API.
[jira] [Created] (FLINK-1919) Add HCatOutputFormat for Tuple data types
Fabian Hueske created FLINK-1919: Summary: Add HCatOutputFormat for Tuple data types Key: FLINK-1919 URL: https://issues.apache.org/jira/browse/FLINK-1919 Project: Flink Issue Type: New Feature Components: Java API, Scala API Reporter: Fabian Hueske Priority: Minor It would be good to have an OutputFormat that can write data to HCatalog tables. The Hadoop `HCatOutputFormat` expects `HCatRecord` objects and writes these to HCatalog tables. We can do the same thing, by creating these `HCatRecord` object with a Map function that precedes a `HadoopOutputFormat` that wraps the Hadoop `HCatOutputFormat`. Better support for Flink Tuples can be added by implementing a custom `HCatOutputFormat` that also depends on the Hadoop `HCatOutputFormat` but internally converts Flink Tuples to `HCatRecords`. This would also include to check if the schema of the HCatalog table and the Flink tuples match. For data types other than tuples, the OutputFormat could either require a preceding Map function that converts to `HCatRecords` or let users specify a MapFunction and invoke that internally. We have already a Flink `HCatInputFormat` which does this in the reverse directions, i.e., it emits Flink Tuples from HCatalog tables. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Periodic full stream aggregations
You are right, but you should never try to compute full stream median, thats the point :D On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna < cado...@informatik.hu-berlin.de> wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA1 > > Hi Gyula, > > I read your comments of your PR. > > I have a question to this comment: > > "It only allows aggregations so we dont need to keep the full history > in a buffer." > > What if the user implements an aggregation function like a median? > > For a median you need the full history, don't you? > > Am I missing something? > > Cheers, > Bruno > > On 21.04.2015 14:31, Gyula Fóra wrote: > > I have opened a PR for this feature: > > > > https://github.com/apache/flink/pull/614 > > > > Cheers, Gyula > > > > On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra > > wrote: > > > >> Thats a good idea, I will modify my PR to that :) > >> > >> Gyula > >> > >> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske > >> wrote: > >> > >>> Is it possible to switch the order of the statements, i.e., > >>> > >>> dataStream.every(Time.of(4,sec)).reduce(...) instead of > >>> dataStream.reduce(...).every(Time.of(4,sec)) > >>> > >>> I think that would be more consistent with the structure of the > >>> remaining API. > >>> > >>> Cheers, Fabian > >>> > >>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra : > >>> > Hi Bruno, > > Of course you can do that as well. (That's the good part :p > ) > > I will open a PR soon with the proposed changes (first > without breaking > >>> the > current Api) and I will post it here. > > Cheers, Gyula > > On Tuesday, April 21, 2015, Bruno Cadonna < > >>> cado...@informatik.hu-berlin.de > > > wrote: > > > Hi Gyula, > > > > I have a question regarding your suggestion. > > > > Can the current continuous aggregation be also specified with your > > proposed periodic aggregation? > > > > I am thinking about something like > > > > dataStream.reduce(...).every(Count.of(1)) > > > > Cheers, Bruno > > > > On 20.04.2015 22:32, Gyula Fóra wrote: > >>> Hey all, > >>> > >>> I think we are missing a quite useful feature that > >>> could be implemented (with some slight modifications) > >>> on top of the current windowing api. > >>> > >>> We currently provide 2 ways of aggregating (or > >>> reducing) over streams: doing a continuous aggregation > >>> and always output the aggregated value (which cannot be > >>> done properly in parallel) or doing aggregation in a > >>> window periodically. > >>> > >>> What we don't have at the moment is periodic > >>> aggregations on the whole stream. I would even go as > >>> far as to remove the continuous outputting > >>> reduce/aggregate it and replace it with this version > >>> as this in return can be done properly in parallel. > >>> > >>> My suggestion would be that a call: > >>> > >>> dataStream.reduce(..) dataStream.sum(..) > >>> > >>> would return a windowed data stream where the window is > >>> the whole record history, and the user would need to > >>> define a trigger to get the actual reduced values > >>> like: > >>> > >>> dataStream.reduce(...).every(Time.of(4,sec)) to get the > >>> actual reduced results. dataStream.sum(...).every(...) > >>> > >>> I think the current data stream reduce/aggregation is > >>> very confusing without being practical for any normal > >>> use-case. > >>> > >>> Also this would be a very api breaking change (but I > >>> would still make this change as it is much more > >>> intuitive than the current behaviour) so I would try to > >>> push it before the release if we can agree. > >>> > >>> Cheers, Gyula > >>> > > > > > > >>> > >> > >> > > > > - -- > ~~~ > > Dr. Bruno Cadonna > Postdoctoral Researcher > > Databases and Information Systems > Department of Computer Science > Humboldt-Universität zu Berlin > > http://www.informatik.hu-berlin.de/~cadonnab > > ~~~ > -BEGIN PGP SIGNATURE- > Version: GnuPG v1.4.11 (GNU/Linux) > > iQEcBAEBAgAGBQJVNkgYAAoJEKdCIJx7flKw7rUIAMmu80ZuMvfA/BvQemkEo7As > bU3iWre+e3OUWNRLuf2JfG9CHMKFSjBJG6Jax/pWZBXTYh8oaYDrYixq7e+vljqf > P9ypurhd1h8In71aSUyUPIsrTg6aJ5xo/beUxA6LFbB2LpVqawNDe0gjn3ZRMobM > zmn962kqp0oHAVipYI2mzEU6RNl1Kh0PoaLaZRLRh+dlgKofqDFcBiB3hhG/VEoF > sCsCAsC1bXtpToPRZ29cRcEfpHcnE3zCgivPeG83JsWYr4mIEj7gp+smFUz0PjoI > 1wHv/pnZJS4Onk38HH1GcP95/uYpqm4gz3OBCuE7v+3b1bI852bIvnUZrCGLOew= > =u1R0 > -END PGP SIGNATURE- >
Re: Periodic full stream aggregations
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I read your comments of your PR. I have a question to this comment: "It only allows aggregations so we dont need to keep the full history in a buffer." What if the user implements an aggregation function like a median? For a median you need the full history, don't you? Am I missing something? Cheers, Bruno On 21.04.2015 14:31, Gyula Fóra wrote: > I have opened a PR for this feature: > > https://github.com/apache/flink/pull/614 > > Cheers, Gyula > > On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra > wrote: > >> Thats a good idea, I will modify my PR to that :) >> >> Gyula >> >> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske >> wrote: >> >>> Is it possible to switch the order of the statements, i.e., >>> >>> dataStream.every(Time.of(4,sec)).reduce(...) instead of >>> dataStream.reduce(...).every(Time.of(4,sec)) >>> >>> I think that would be more consistent with the structure of the >>> remaining API. >>> >>> Cheers, Fabian >>> >>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra : >>> Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking >>> the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna < >>> cado...@informatik.hu-berlin.de > wrote: > Hi Gyula, > > I have a question regarding your suggestion. > > Can the current continuous aggregation be also specified with your > proposed periodic aggregation? > > I am thinking about something like > > dataStream.reduce(...).every(Count.of(1)) > > Cheers, Bruno > > On 20.04.2015 22:32, Gyula Fóra wrote: >>> Hey all, >>> >>> I think we are missing a quite useful feature that >>> could be implemented (with some slight modifications) >>> on top of the current windowing api. >>> >>> We currently provide 2 ways of aggregating (or >>> reducing) over streams: doing a continuous aggregation >>> and always output the aggregated value (which cannot be >>> done properly in parallel) or doing aggregation in a >>> window periodically. >>> >>> What we don't have at the moment is periodic >>> aggregations on the whole stream. I would even go as >>> far as to remove the continuous outputting >>> reduce/aggregate it and replace it with this version >>> as this in return can be done properly in parallel. >>> >>> My suggestion would be that a call: >>> >>> dataStream.reduce(..) dataStream.sum(..) >>> >>> would return a windowed data stream where the window is >>> the whole record history, and the user would need to >>> define a trigger to get the actual reduced values >>> like: >>> >>> dataStream.reduce(...).every(Time.of(4,sec)) to get the >>> actual reduced results. dataStream.sum(...).every(...) >>> >>> I think the current data stream reduce/aggregation is >>> very confusing without being practical for any normal >>> use-case. >>> >>> Also this would be a very api breaking change (but I >>> would still make this change as it is much more >>> intuitive than the current behaviour) so I would try to >>> push it before the release if we can agree. >>> >>> Cheers, Gyula >>> > > >>> >> >> > - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNkgYAAoJEKdCIJx7flKw7rUIAMmu80ZuMvfA/BvQemkEo7As bU3iWre+e3OUWNRLuf2JfG9CHMKFSjBJG6Jax/pWZBXTYh8oaYDrYixq7e+vljqf P9ypurhd1h8In71aSUyUPIsrTg6aJ5xo/beUxA6LFbB2LpVqawNDe0gjn3ZRMobM zmn962kqp0oHAVipYI2mzEU6RNl1Kh0PoaLaZRLRh+dlgKofqDFcBiB3hhG/VEoF sCsCAsC1bXtpToPRZ29cRcEfpHcnE3zCgivPeG83JsWYr4mIEj7gp+smFUz0PjoI 1wHv/pnZJS4Onk38HH1GcP95/uYpqm4gz3OBCuE7v+3b1bI852bIvnUZrCGLOew= =u1R0 -END PGP SIGNATURE-
Re: Periodic full stream aggregations
I have opened a PR for this feature: https://github.com/apache/flink/pull/614 Cheers, Gyula On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra wrote: > Thats a good idea, I will modify my PR to that :) > > Gyula > > On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske wrote: > >> Is it possible to switch the order of the statements, i.e., >> >> dataStream.every(Time.of(4,sec)).reduce(...) instead of >> dataStream.reduce(...).every(Time.of(4,sec)) >> >> I think that would be more consistent with the structure of the remaining >> API. >> >> Cheers, Fabian >> >> 2015-04-21 10:57 GMT+02:00 Gyula Fóra : >> >> > Hi Bruno, >> > >> > Of course you can do that as well. (That's the good part :p ) >> > >> > I will open a PR soon with the proposed changes (first without breaking >> the >> > current Api) and I will post it here. >> > >> > Cheers, >> > Gyula >> > >> > On Tuesday, April 21, 2015, Bruno Cadonna < >> cado...@informatik.hu-berlin.de >> > > >> > wrote: >> > >> > > -BEGIN PGP SIGNED MESSAGE- >> > > Hash: SHA1 >> > > >> > > Hi Gyula, >> > > >> > > I have a question regarding your suggestion. >> > > >> > > Can the current continuous aggregation be also specified with your >> > > proposed periodic aggregation? >> > > >> > > I am thinking about something like >> > > >> > > dataStream.reduce(...).every(Count.of(1)) >> > > >> > > Cheers, >> > > Bruno >> > > >> > > On 20.04.2015 22:32, Gyula Fóra wrote: >> > > > Hey all, >> > > > >> > > > I think we are missing a quite useful feature that could be >> > > > implemented (with some slight modifications) on top of the current >> > > > windowing api. >> > > > >> > > > We currently provide 2 ways of aggregating (or reducing) over >> > > > streams: doing a continuous aggregation and always output the >> > > > aggregated value (which cannot be done properly in parallel) or >> > > > doing aggregation in a window periodically. >> > > > >> > > > What we don't have at the moment is periodic aggregations on the >> > > > whole stream. I would even go as far as to remove the continuous >> > > > outputting reduce/aggregate it and replace it with this version as >> > > > this in return can be done properly in parallel. >> > > > >> > > > My suggestion would be that a call: >> > > > >> > > > dataStream.reduce(..) dataStream.sum(..) >> > > > >> > > > would return a windowed data stream where the window is the whole >> > > > record history, and the user would need to define a trigger to get >> > > > the actual reduced values like: >> > > > >> > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual >> > > > reduced results. dataStream.sum(...).every(...) >> > > > >> > > > I think the current data stream reduce/aggregation is very >> > > > confusing without being practical for any normal use-case. >> > > > >> > > > Also this would be a very api breaking change (but I would still >> > > > make this change as it is much more intuitive than the current >> > > > behaviour) so I would try to push it before the release if we can >> > > > agree. >> > > > >> > > > Cheers, Gyula >> > > > >> > > >> > > - -- >> > > ~~~ >> > > >> > > Dr. Bruno Cadonna >> > > Postdoctoral Researcher >> > > >> > > Databases and Information Systems >> > > Department of Computer Science >> > > Humboldt-Universität zu Berlin >> > > >> > > http://www.informatik.hu-berlin.de/~cadonnab >> > > >> > > ~~~ >> > > -BEGIN PGP SIGNATURE- >> > > Version: GnuPG v1.4.11 (GNU/Linux) >> > > >> > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr >> > > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB >> > > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n >> > > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L >> > > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p >> > > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= >> > > =8bVQ >> > > -END PGP SIGNATURE- >> > > >> > >> > >
Re: Akka transparency and serialisation
Good point to raise Paris. Here are the practices I (and others) have been using, they work well 1) Do not assume serialization, that is true. If you need to make sure that the instance of the data is not shared after the message, send a manually serialized version. The "InstantiationUtil" has methods to serialize/deserialize into/from byte arrays. 2) If the data involves user-defined classes, always serialize manually, because the derserialization in the akka stack will not use the required user-code classloader. Have a look at the class "SerializedValue", which eagerly serializes and lazily deserialized (with a given class loader) to overcome these situations. 3) I totally agree to not make any assumptions on the behavior of transient fields. Stephan On Tue, Apr 21, 2015 at 1:41 PM, Paris Carbone wrote: > Hello everyone, > > > Many of you are already aware of this but it is good to make it clear in > the mailist. We bumped into this "special" case with Akka several times > already and it is important to know where transparency actually breaks. > > > In short, Akka serialises only messages that get transferred over the wire > or across JVMs [1]. Thus, we should not rely on messages getting serialised > for anything we want to transfer using Akka. To overcome this we should > either: > > > 1) Do a deep copy of everything passed via Akka messaging > > 2) Apply serialisation manually before sending messages and transfer only > pre-serialized data. > > 3) Never rely on transient fields > > > cheers > > Paris > > > [1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html > >
Akka transparency and serialisation
Hello everyone, Many of you are already aware of this but it is good to make it clear in the mailist. We bumped into this "special" case with Akka several times already and it is important to know where transparency actually breaks. In short, Akka serialises only messages that get transferred over the wire or across JVMs [1]. Thus, we should not rely on messages getting serialised for anything we want to transfer using Akka. To overcome this we should either: 1) Do a deep copy of everything passed via Akka messaging 2) Apply serialisation manually before sending messages and transfer only pre-serialized data. 3) Never rely on transient fields cheers Paris [1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html
Re: Periodic full stream aggregations
Thats a good idea, I will modify my PR to that :) Gyula On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske wrote: > Is it possible to switch the order of the statements, i.e., > > dataStream.every(Time.of(4,sec)).reduce(...) instead of > dataStream.reduce(...).every(Time.of(4,sec)) > > I think that would be more consistent with the structure of the remaining > API. > > Cheers, Fabian > > 2015-04-21 10:57 GMT+02:00 Gyula Fóra : > > > Hi Bruno, > > > > Of course you can do that as well. (That's the good part :p ) > > > > I will open a PR soon with the proposed changes (first without breaking > the > > current Api) and I will post it here. > > > > Cheers, > > Gyula > > > > On Tuesday, April 21, 2015, Bruno Cadonna < > cado...@informatik.hu-berlin.de > > > > > wrote: > > > > > -BEGIN PGP SIGNED MESSAGE- > > > Hash: SHA1 > > > > > > Hi Gyula, > > > > > > I have a question regarding your suggestion. > > > > > > Can the current continuous aggregation be also specified with your > > > proposed periodic aggregation? > > > > > > I am thinking about something like > > > > > > dataStream.reduce(...).every(Count.of(1)) > > > > > > Cheers, > > > Bruno > > > > > > On 20.04.2015 22:32, Gyula Fóra wrote: > > > > Hey all, > > > > > > > > I think we are missing a quite useful feature that could be > > > > implemented (with some slight modifications) on top of the current > > > > windowing api. > > > > > > > > We currently provide 2 ways of aggregating (or reducing) over > > > > streams: doing a continuous aggregation and always output the > > > > aggregated value (which cannot be done properly in parallel) or > > > > doing aggregation in a window periodically. > > > > > > > > What we don't have at the moment is periodic aggregations on the > > > > whole stream. I would even go as far as to remove the continuous > > > > outputting reduce/aggregate it and replace it with this version as > > > > this in return can be done properly in parallel. > > > > > > > > My suggestion would be that a call: > > > > > > > > dataStream.reduce(..) dataStream.sum(..) > > > > > > > > would return a windowed data stream where the window is the whole > > > > record history, and the user would need to define a trigger to get > > > > the actual reduced values like: > > > > > > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual > > > > reduced results. dataStream.sum(...).every(...) > > > > > > > > I think the current data stream reduce/aggregation is very > > > > confusing without being practical for any normal use-case. > > > > > > > > Also this would be a very api breaking change (but I would still > > > > make this change as it is much more intuitive than the current > > > > behaviour) so I would try to push it before the release if we can > > > > agree. > > > > > > > > Cheers, Gyula > > > > > > > > > > - -- > > > ~~~ > > > > > > Dr. Bruno Cadonna > > > Postdoctoral Researcher > > > > > > Databases and Information Systems > > > Department of Computer Science > > > Humboldt-Universität zu Berlin > > > > > > http://www.informatik.hu-berlin.de/~cadonnab > > > > > > ~~~ > > > -BEGIN PGP SIGNATURE- > > > Version: GnuPG v1.4.11 (GNU/Linux) > > > > > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr > > > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB > > > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n > > > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L > > > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p > > > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= > > > =8bVQ > > > -END PGP SIGNATURE- > > > > > >
[jira] [Created] (FLINK-1918) NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment
Zoltán Zvara created FLINK-1918: --- Summary: NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment Key: FLINK-1918 URL: https://issues.apache.org/jira/browse/FLINK-1918 Project: Flink Issue Type: Bug Components: YARN Client Reporter: Zoltán Zvara Trace: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.client.program.Client.(Client.java:104) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:86) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70) at Wordcount.main(Wordcount.java:23) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) {code} The constructor is trying to set configuration parameter {{jobmanager.rpc.address}} with {{jobManagerAddress.getAddress().getHostAddress()}}, but {{jobManagerAddress.holder.addr}} is {{null}}. {{jobManagerAddress.holder.hostname}} and {{port}} holds the valid information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1917) EOFException when running delta-iteration job
Stefan Bunk created FLINK-1917: -- Summary: EOFException when running delta-iteration job Key: FLINK-1917 URL: https://issues.apache.org/jira/browse/FLINK-1917 Project: Flink Issue Type: Bug Components: Core, Distributed Runtime, Iterations Environment: 0.9-milestone-1 Exception on the cluster, local execution works Reporter: Stefan Bunk The delta-iteration program in [1] ends with an java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) For logs and the accompanying mailing list discussion see below. When running with slightly different memory configuration, as hinted on the mailing list, I sometimes also get this exception: 19.Apr. 13:39:29 INFO Task - IterationHead(WorksetIteration (Resolved-Redirects)) (10/10) switched to FAILED : java.lang.IndexOutOfBoundsException: Index: 161, Size: 161 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) [1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57 [2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4 [3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc [4] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1916) EOFException when running delta-iteration job
Stefan Bunk created FLINK-1916: -- Summary: EOFException when running delta-iteration job Key: FLINK-1916 URL: https://issues.apache.org/jira/browse/FLINK-1916 Project: Flink Issue Type: Bug Environment: 0.9-milestone-1 Exception on the cluster, local execution works Reporter: Stefan Bunk The delta-iteration program in [1] ends with an java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) For logs and the accompanying mailing list discussion see below. When running with slightly different memory configuration, as hinted on the mailing list, I sometimes also get this exception: 19.Apr. 13:39:29 INFO Task - IterationHead(WorksetIteration (Resolved-Redirects)) (10/10) switched to FAILED : java.lang.IndexOutOfBoundsException: Index: 161, Size: 161 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) [1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57 [2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4 [3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc [4] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: About Operator and OperatorBase
Originally, we had multiple Apis with different data models: the current Java API, the record api, a JSON API. The common API was the data model agnostic set of operators on which they built. It has become redundant when we saw how well things can be built in top of the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all build on top of the Java API.
Re: Periodic full stream aggregations
Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra : > Hi Bruno, > > Of course you can do that as well. (That's the good part :p ) > > I will open a PR soon with the proposed changes (first without breaking the > current Api) and I will post it here. > > Cheers, > Gyula > > On Tuesday, April 21, 2015, Bruno Cadonna > > wrote: > > > -BEGIN PGP SIGNED MESSAGE- > > Hash: SHA1 > > > > Hi Gyula, > > > > I have a question regarding your suggestion. > > > > Can the current continuous aggregation be also specified with your > > proposed periodic aggregation? > > > > I am thinking about something like > > > > dataStream.reduce(...).every(Count.of(1)) > > > > Cheers, > > Bruno > > > > On 20.04.2015 22:32, Gyula Fóra wrote: > > > Hey all, > > > > > > I think we are missing a quite useful feature that could be > > > implemented (with some slight modifications) on top of the current > > > windowing api. > > > > > > We currently provide 2 ways of aggregating (or reducing) over > > > streams: doing a continuous aggregation and always output the > > > aggregated value (which cannot be done properly in parallel) or > > > doing aggregation in a window periodically. > > > > > > What we don't have at the moment is periodic aggregations on the > > > whole stream. I would even go as far as to remove the continuous > > > outputting reduce/aggregate it and replace it with this version as > > > this in return can be done properly in parallel. > > > > > > My suggestion would be that a call: > > > > > > dataStream.reduce(..) dataStream.sum(..) > > > > > > would return a windowed data stream where the window is the whole > > > record history, and the user would need to define a trigger to get > > > the actual reduced values like: > > > > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual > > > reduced results. dataStream.sum(...).every(...) > > > > > > I think the current data stream reduce/aggregation is very > > > confusing without being practical for any normal use-case. > > > > > > Also this would be a very api breaking change (but I would still > > > make this change as it is much more intuitive than the current > > > behaviour) so I would try to push it before the release if we can > > > agree. > > > > > > Cheers, Gyula > > > > > > > - -- > > ~~~ > > > > Dr. Bruno Cadonna > > Postdoctoral Researcher > > > > Databases and Information Systems > > Department of Computer Science > > Humboldt-Universität zu Berlin > > > > http://www.informatik.hu-berlin.de/~cadonnab > > > > ~~~ > > -BEGIN PGP SIGNATURE- > > Version: GnuPG v1.4.11 (GNU/Linux) > > > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr > > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB > > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n > > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L > > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p > > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= > > =8bVQ > > -END PGP SIGNATURE- > > >
Re: Periodic full stream aggregations
Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA1 > > Hi Gyula, > > I have a question regarding your suggestion. > > Can the current continuous aggregation be also specified with your > proposed periodic aggregation? > > I am thinking about something like > > dataStream.reduce(...).every(Count.of(1)) > > Cheers, > Bruno > > On 20.04.2015 22:32, Gyula Fóra wrote: > > Hey all, > > > > I think we are missing a quite useful feature that could be > > implemented (with some slight modifications) on top of the current > > windowing api. > > > > We currently provide 2 ways of aggregating (or reducing) over > > streams: doing a continuous aggregation and always output the > > aggregated value (which cannot be done properly in parallel) or > > doing aggregation in a window periodically. > > > > What we don't have at the moment is periodic aggregations on the > > whole stream. I would even go as far as to remove the continuous > > outputting reduce/aggregate it and replace it with this version as > > this in return can be done properly in parallel. > > > > My suggestion would be that a call: > > > > dataStream.reduce(..) dataStream.sum(..) > > > > would return a windowed data stream where the window is the whole > > record history, and the user would need to define a trigger to get > > the actual reduced values like: > > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual > > reduced results. dataStream.sum(...).every(...) > > > > I think the current data stream reduce/aggregation is very > > confusing without being practical for any normal use-case. > > > > Also this would be a very api breaking change (but I would still > > make this change as it is much more intuitive than the current > > behaviour) so I would try to push it before the release if we can > > agree. > > > > Cheers, Gyula > > > > - -- > ~~~ > > Dr. Bruno Cadonna > Postdoctoral Researcher > > Databases and Information Systems > Department of Computer Science > Humboldt-Universität zu Berlin > > http://www.informatik.hu-berlin.de/~cadonnab > > ~~~ > -BEGIN PGP SIGNATURE- > Version: GnuPG v1.4.11 (GNU/Linux) > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= > =8bVQ > -END PGP SIGNATURE- >
Re: Periodic full stream aggregations
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: > Hey all, > > I think we are missing a quite useful feature that could be > implemented (with some slight modifications) on top of the current > windowing api. > > We currently provide 2 ways of aggregating (or reducing) over > streams: doing a continuous aggregation and always output the > aggregated value (which cannot be done properly in parallel) or > doing aggregation in a window periodically. > > What we don't have at the moment is periodic aggregations on the > whole stream. I would even go as far as to remove the continuous > outputting reduce/aggregate it and replace it with this version as > this in return can be done properly in parallel. > > My suggestion would be that a call: > > dataStream.reduce(..) dataStream.sum(..) > > would return a windowed data stream where the window is the whole > record history, and the user would need to define a trigger to get > the actual reduced values like: > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual > reduced results. dataStream.sum(...).every(...) > > I think the current data stream reduce/aggregation is very > confusing without being practical for any normal use-case. > > Also this would be a very api breaking change (but I would still > make this change as it is much more intuitive than the current > behaviour) so I would try to push it before the release if we can > agree. > > Cheers, Gyula > - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= =8bVQ -END PGP SIGNATURE-
Fault Tolerance for Flink Iterations
Hi everybody, I am writing my master thesis about making flink iterations / iterative flink algorithms fault tolerant. The first approach I implemented is a basic checkpointing, where every N iterations the current state is saved into HDFS. To do this I enabled data sinks inside of iterations, then attached a new checkpointing sink to the beginning of each iteration. To recover from a previous checkpoint I cancel all tasks, add a new datasource in front of the iteration and reschedule the tasks with lower dop. I do this out of the JobManager during runtime without starting a new job. The problem is that sometimes the input data to the iteration has some properties like a certain partitioning or sorting, and I am struggeling with reconstructing theses properties from the checkpoint source. I figured that an easier way to do this is to re-optimize the new plan (with the new source as input to the iteration) before the rescheduling. But in the current project structure flink-runtime has no access to flink-optimizer and it would be a major design break to change this. Has somebody any advice on this? best, Markus