Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
Hey,

The current code supports 2 types of aggregations, simple binary reduce:
T,T=>T and also the grouped version for this, where the reduce function is
applied per a user defined key (so there we keep a map of reduced values).
This can already be used to implement fairly complex logic if we transform
the data to a proper type before passing it to the reducer.

As a next step we can make this work with fold + combiners as well, where
your initial data type is T and your fould function is T,R => R and a
combiner is R,R => R.

At that point I think any sensible aggregation can be implemented.

Regards,
Gyula


On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna <
cado...@informatik.hu-berlin.de> wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA1
>
> Hi Gyula,
>
> fair enough!
>
> I used a bad example.
>
> What I really wanted to know is whether your code supports only
> aggregation like sum, min, and max where you need to pass only a value
> to the next aggregation or also more complex data structures, e.g., a
> synopsis of the full stream, to compute an aggregation such as an
> approximate count distinct (item count)?
>
> Cheers,
> Bruno
>
> On 21.04.2015 15:18, Gyula Fóra wrote:
> > You are right, but you should never try to compute full stream
> > median, thats the point :D
> >
> > On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna <
> > cado...@informatik.hu-berlin.de> wrote:
> >
> > Hi Gyula,
> >
> > I read your comments of your PR.
> >
> > I have a question to this comment:
> >
> > "It only allows aggregations so we dont need to keep the full
> > history in a buffer."
> >
> > What if the user implements an aggregation function like a median?
> >
> > For a median you need the full history, don't you?
> >
> > Am I missing something?
> >
> > Cheers, Bruno
> >
> > On 21.04.2015 14:31, Gyula Fóra wrote:
>  I have opened a PR for this feature:
> 
>  https://github.com/apache/flink/pull/614
> 
>  Cheers, Gyula
> 
>  On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra
>   wrote:
> 
> > Thats a good idea, I will modify my PR to that :)
> >
> > Gyula
> >
> > On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
> >  wrote:
> >
> >> Is it possible to switch the order of the statements,
> >> i.e.,
> >>
> >> dataStream.every(Time.of(4,sec)).reduce(...) instead of
> >> dataStream.reduce(...).every(Time.of(4,sec))
> >>
> >> I think that would be more consistent with the structure
> >> of the remaining API.
> >>
> >> Cheers, Fabian
> >>
> >> 2015-04-21 10:57 GMT+02:00 Gyula Fóra
> >> :
> >>
> >>> Hi Bruno,
> >>>
> >>> Of course you can do that as well. (That's the good
> >>> part :p )
> >>>
> >>> I will open a PR soon with the proposed changes (first
> >>> without breaking
> >> the
> >>> current Api) and I will post it here.
> >>>
> >>> Cheers, Gyula
> >>>
> >>> On Tuesday, April 21, 2015, Bruno Cadonna <
> >> cado...@informatik.hu-berlin.de
> 
> >>> wrote:
> >>>
>  Hi Gyula,
> 
>  I have a question regarding your suggestion.
> 
>  Can the current continuous aggregation be also specified with
>  your proposed periodic aggregation?
> 
>  I am thinking about something like
> 
>  dataStream.reduce(...).every(Count.of(1))
> 
>  Cheers, Bruno
> 
>  On 20.04.2015 22:32, Gyula Fóra wrote:
> >> Hey all,
> >>
> >> I think we are missing a quite useful feature
> >> that could be implemented (with some slight
> >> modifications) on top of the current windowing
> >> api.
> >>
> >> We currently provide 2 ways of aggregating (or
> >> reducing) over streams: doing a continuous
> >> aggregation and always output the aggregated
> >> value (which cannot be done properly in parallel)
> >> or doing aggregation in a window periodically.
> >>
> >> What we don't have at the moment is periodic
> >> aggregations on the whole stream. I would even go
> >> as far as to remove the continuous outputting
> >> reduce/aggregate it and replace it with this
> >> version as this in return can be done properly in
> >> parallel.
> >>
> >> My suggestion would be that a call:
> >>
> >> dataStream.reduce(..) dataStream.sum(..)
> >>
> >> would return a windowed data stream where the
> >> window is the whole record history, and the user
> >> would need to define a trigger to get the actual
> >> reduced values like:
> >>
> >> dataStream.reduce(...).every(Time.of(4,sec)) to
> >> get the actual reduced results.
> >> dataStream.sum(...).every(...)
> >>
> >> I think the current data stream
> >> reduce/aggregation is very confusing without
> >> being pra

Re: Periodic full stream aggregations

2015-04-21 Thread Bruno Cadonna
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi Gyula,

fair enough!

I used a bad example.

What I really wanted to know is whether your code supports only
aggregation like sum, min, and max where you need to pass only a value
to the next aggregation or also more complex data structures, e.g., a
synopsis of the full stream, to compute an aggregation such as an
approximate count distinct (item count)?

Cheers,
Bruno

On 21.04.2015 15:18, Gyula Fóra wrote:
> You are right, but you should never try to compute full stream
> median, thats the point :D
> 
> On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna < 
> cado...@informatik.hu-berlin.de> wrote:
> 
> Hi Gyula,
> 
> I read your comments of your PR.
> 
> I have a question to this comment:
> 
> "It only allows aggregations so we dont need to keep the full
> history in a buffer."
> 
> What if the user implements an aggregation function like a median?
> 
> For a median you need the full history, don't you?
> 
> Am I missing something?
> 
> Cheers, Bruno
> 
> On 21.04.2015 14:31, Gyula Fóra wrote:
 I have opened a PR for this feature:
 
 https://github.com/apache/flink/pull/614
 
 Cheers, Gyula
 
 On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra
  wrote:
 
> Thats a good idea, I will modify my PR to that :)
> 
> Gyula
> 
> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske 
>  wrote:
> 
>> Is it possible to switch the order of the statements,
>> i.e.,
>> 
>> dataStream.every(Time.of(4,sec)).reduce(...) instead of 
>> dataStream.reduce(...).every(Time.of(4,sec))
>> 
>> I think that would be more consistent with the structure
>> of the remaining API.
>> 
>> Cheers, Fabian
>> 
>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra
>> :
>> 
>>> Hi Bruno,
>>> 
>>> Of course you can do that as well. (That's the good
>>> part :p )
>>> 
>>> I will open a PR soon with the proposed changes (first 
>>> without breaking
>> the
>>> current Api) and I will post it here.
>>> 
>>> Cheers, Gyula
>>> 
>>> On Tuesday, April 21, 2015, Bruno Cadonna <
>> cado...@informatik.hu-berlin.de
 
>>> wrote:
>>> 
 Hi Gyula,
 
 I have a question regarding your suggestion.
 
 Can the current continuous aggregation be also specified with
 your proposed periodic aggregation?
 
 I am thinking about something like
 
 dataStream.reduce(...).every(Count.of(1))
 
 Cheers, Bruno
 
 On 20.04.2015 22:32, Gyula Fóra wrote:
>> Hey all,
>> 
>> I think we are missing a quite useful feature
>> that could be implemented (with some slight
>> modifications) on top of the current windowing
>> api.
>> 
>> We currently provide 2 ways of aggregating (or 
>> reducing) over streams: doing a continuous
>> aggregation and always output the aggregated
>> value (which cannot be done properly in parallel)
>> or doing aggregation in a window periodically.
>> 
>> What we don't have at the moment is periodic 
>> aggregations on the whole stream. I would even go
>> as far as to remove the continuous outputting 
>> reduce/aggregate it and replace it with this
>> version as this in return can be done properly in
>> parallel.
>> 
>> My suggestion would be that a call:
>> 
>> dataStream.reduce(..) dataStream.sum(..)
>> 
>> would return a windowed data stream where the
>> window is the whole record history, and the user
>> would need to define a trigger to get the actual
>> reduced values like:
>> 
>> dataStream.reduce(...).every(Time.of(4,sec)) to
>> get the actual reduced results.
>> dataStream.sum(...).every(...)
>> 
>> I think the current data stream
>> reduce/aggregation is very confusing without
>> being practical for any normal use-case.
>> 
>> Also this would be a very api breaking change
>> (but I would still make this change as it is much
>> more intuitive than the current behaviour) so I
>> would try to push it before the release if we can
>> agree.
>> 
>> Cheers, Gyula
>> 
 
 
>>> 
>> 
> 
> 
 
> 
>> 
> 

- -- 
~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~
-BEGIN PGP SIGNATURE-
Version: GnuPG v1

iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d
jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
IWJoqT17EetTw82brOfy+kLC

Re: Fault Tolerance for Flink Iterations

2015-04-21 Thread Stephan Ewen
Hi Markus!

I see your point. My first guess would be that it would be simpler to do
this logic in the driver program, rather
than inside the JobManager. If the checkpoints are all written and the job
fails, you check what was the latest completed
checkpoint (by file) and then start the program again with the source that
refers to those files.

That way, you go through the proper stack (optimizer and jobgraph
generator) that inserts all the necessary partition and
sort operations.

Greetings,
Stephan



On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer <
holzemer.mar...@googlemail.com> wrote:

> Hi everybody,
>
> I am writing my master thesis about making flink iterations / iterative
> flink algorithms fault tolerant.
> The first approach I implemented is a basic checkpointing, where every N
> iterations the current state is saved into HDFS.
> To do this I enabled data sinks inside of iterations, then attached a new
> checkpointing sink to the beginning of each iteration. To recover from a
> previous checkpoint I cancel all tasks, add a new datasource in front of
> the iteration and reschedule the tasks with lower dop. I do this out of the
> JobManager during runtime without starting a new job.
> The problem is that sometimes the input data to the iteration has some
> properties like a certain partitioning or sorting, and I am struggeling
> with reconstructing theses properties from the checkpoint source.
> I figured that an easier way to do this is to re-optimize the new plan
> (with the new source as input to the iteration) before the rescheduling.
> But in the current project structure flink-runtime has no access to
> flink-optimizer and it would be a major design break to change this.
> Has somebody any advice on this?
>
> best,
> Markus
>


Re: About Operator and OperatorBase

2015-04-21 Thread Henry Saputra
Thanks for the explanation, Stephan. I always wonder why the extra
common APIs exist.

Then I think this should be high priority if we want to remove the
common API to reduce the unnecessary layer and "dead code". As Ufuk
mentioned before, better doing it now before more stuff build on top
of Flink.

So removing old Record API [1] and the tests depending on them is step
one of the process, but what is JSON API?

- Henry

[1] https://issues.apache.org/jira/browse/FLINK-1681

On Tue, Apr 21, 2015 at 1:10 AM, Stephan Ewen  wrote:
> Originally, we had multiple Apis with different data models: the current
> Java API, the record api, a JSON API. The common API was the data model
> agnostic set of operators on which they built.
>
> It has become redundant when we saw how well things can be built in top of
> the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all
> build on top of the Java API.


[jira] [Created] (FLINK-1919) Add HCatOutputFormat for Tuple data types

2015-04-21 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-1919:


 Summary: Add HCatOutputFormat for Tuple data types
 Key: FLINK-1919
 URL: https://issues.apache.org/jira/browse/FLINK-1919
 Project: Flink
  Issue Type: New Feature
  Components: Java API, Scala API
Reporter: Fabian Hueske
Priority: Minor


It would be good to have an OutputFormat that can write data to HCatalog tables.

The Hadoop `HCatOutputFormat` expects `HCatRecord` objects and writes these to 
HCatalog tables. We can do the same thing, by creating these `HCatRecord` 
object with a Map function that precedes a `HadoopOutputFormat` that wraps the 
Hadoop `HCatOutputFormat`.

Better support for Flink Tuples can be added by implementing a custom 
`HCatOutputFormat` that also depends on the Hadoop `HCatOutputFormat` but 
internally converts Flink Tuples to `HCatRecords`. This would also include to 
check if the schema of the HCatalog table and the Flink tuples match. For data 
types other than tuples, the OutputFormat could either require a preceding Map 
function that converts to `HCatRecords` or let users specify a MapFunction and 
invoke that internally.

We have already a Flink `HCatInputFormat` which does this in the reverse 
directions, i.e., it emits Flink Tuples from HCatalog tables.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
You are right, but you should never try to compute full stream median,
thats the point :D

On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna <
cado...@informatik.hu-berlin.de> wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA1
>
> Hi Gyula,
>
> I read your comments of your PR.
>
> I have a question to this comment:
>
> "It only allows aggregations so we dont need to keep the full history
> in a buffer."
>
> What if the user implements an aggregation function like a median?
>
> For a median you need the full history, don't you?
>
> Am I missing something?
>
> Cheers,
> Bruno
>
> On 21.04.2015 14:31, Gyula Fóra wrote:
> > I have opened a PR for this feature:
> >
> > https://github.com/apache/flink/pull/614
> >
> > Cheers, Gyula
> >
> > On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra 
> > wrote:
> >
> >> Thats a good idea, I will modify my PR to that :)
> >>
> >> Gyula
> >>
> >> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
> >>  wrote:
> >>
> >>> Is it possible to switch the order of the statements, i.e.,
> >>>
> >>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
> >>> dataStream.reduce(...).every(Time.of(4,sec))
> >>>
> >>> I think that would be more consistent with the structure of the
> >>> remaining API.
> >>>
> >>> Cheers, Fabian
> >>>
> >>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra :
> >>>
>  Hi Bruno,
> 
>  Of course you can do that as well. (That's the good part :p
>  )
> 
>  I will open a PR soon with the proposed changes (first
>  without breaking
> >>> the
>  current Api) and I will post it here.
> 
>  Cheers, Gyula
> 
>  On Tuesday, April 21, 2015, Bruno Cadonna <
> >>> cado...@informatik.hu-berlin.de
> >
>  wrote:
> 
> > Hi Gyula,
> >
> > I have a question regarding your suggestion.
> >
> > Can the current continuous aggregation be also specified with your
> > proposed periodic aggregation?
> >
> > I am thinking about something like
> >
> > dataStream.reduce(...).every(Count.of(1))
> >
> > Cheers, Bruno
> >
> > On 20.04.2015 22:32, Gyula Fóra wrote:
> >>> Hey all,
> >>>
> >>> I think we are missing a quite useful feature that
> >>> could be implemented (with some slight modifications)
> >>> on top of the current windowing api.
> >>>
> >>> We currently provide 2 ways of aggregating (or
> >>> reducing) over streams: doing a continuous aggregation
> >>> and always output the aggregated value (which cannot be
> >>> done properly in parallel) or doing aggregation in a
> >>> window periodically.
> >>>
> >>> What we don't have at the moment is periodic
> >>> aggregations on the whole stream. I would even go as
> >>> far as to remove the continuous outputting
> >>> reduce/aggregate it and replace it with this version
> >>> as this in return can be done properly in parallel.
> >>>
> >>> My suggestion would be that a call:
> >>>
> >>> dataStream.reduce(..) dataStream.sum(..)
> >>>
> >>> would return a windowed data stream where the window is
> >>> the whole record history, and the user would need to
> >>> define a trigger to get the actual reduced values
> >>> like:
> >>>
> >>> dataStream.reduce(...).every(Time.of(4,sec)) to get the
> >>> actual reduced results. dataStream.sum(...).every(...)
> >>>
> >>> I think the current data stream reduce/aggregation is
> >>> very confusing without being practical for any normal
> >>> use-case.
> >>>
> >>> Also this would be a very api breaking change (but I
> >>> would still make this change as it is much more
> >>> intuitive than the current behaviour) so I would try to
> >>> push it before the release if we can agree.
> >>>
> >>> Cheers, Gyula
> >>>
> >
> >
> 
> >>>
> >>
> >>
> >
>
> - --
> ~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~
> -BEGIN PGP SIGNATURE-
> Version: GnuPG v1.4.11 (GNU/Linux)
>
> iQEcBAEBAgAGBQJVNkgYAAoJEKdCIJx7flKw7rUIAMmu80ZuMvfA/BvQemkEo7As
> bU3iWre+e3OUWNRLuf2JfG9CHMKFSjBJG6Jax/pWZBXTYh8oaYDrYixq7e+vljqf
> P9ypurhd1h8In71aSUyUPIsrTg6aJ5xo/beUxA6LFbB2LpVqawNDe0gjn3ZRMobM
> zmn962kqp0oHAVipYI2mzEU6RNl1Kh0PoaLaZRLRh+dlgKofqDFcBiB3hhG/VEoF
> sCsCAsC1bXtpToPRZ29cRcEfpHcnE3zCgivPeG83JsWYr4mIEj7gp+smFUz0PjoI
> 1wHv/pnZJS4Onk38HH1GcP95/uYpqm4gz3OBCuE7v+3b1bI852bIvnUZrCGLOew=
> =u1R0
> -END PGP SIGNATURE-
>


Re: Periodic full stream aggregations

2015-04-21 Thread Bruno Cadonna
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi Gyula,

I read your comments of your PR.

I have a question to this comment:

"It only allows aggregations so we dont need to keep the full history
in a buffer."

What if the user implements an aggregation function like a median?

For a median you need the full history, don't you?

Am I missing something?

Cheers,
Bruno

On 21.04.2015 14:31, Gyula Fóra wrote:
> I have opened a PR for this feature:
> 
> https://github.com/apache/flink/pull/614
> 
> Cheers, Gyula
> 
> On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra 
> wrote:
> 
>> Thats a good idea, I will modify my PR to that :)
>> 
>> Gyula
>> 
>> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
>>  wrote:
>> 
>>> Is it possible to switch the order of the statements, i.e.,
>>> 
>>> dataStream.every(Time.of(4,sec)).reduce(...) instead of 
>>> dataStream.reduce(...).every(Time.of(4,sec))
>>> 
>>> I think that would be more consistent with the structure of the
>>> remaining API.
>>> 
>>> Cheers, Fabian
>>> 
>>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra :
>>> 
 Hi Bruno,
 
 Of course you can do that as well. (That's the good part :p
 )
 
 I will open a PR soon with the proposed changes (first
 without breaking
>>> the
 current Api) and I will post it here.
 
 Cheers, Gyula
 
 On Tuesday, April 21, 2015, Bruno Cadonna <
>>> cado...@informatik.hu-berlin.de
> 
 wrote:
 
> Hi Gyula,
> 
> I have a question regarding your suggestion.
> 
> Can the current continuous aggregation be also specified with your 
> proposed periodic aggregation?
> 
> I am thinking about something like
> 
> dataStream.reduce(...).every(Count.of(1))
> 
> Cheers, Bruno
> 
> On 20.04.2015 22:32, Gyula Fóra wrote:
>>> Hey all,
>>> 
>>> I think we are missing a quite useful feature that
>>> could be implemented (with some slight modifications)
>>> on top of the current windowing api.
>>> 
>>> We currently provide 2 ways of aggregating (or
>>> reducing) over streams: doing a continuous aggregation
>>> and always output the aggregated value (which cannot be
>>> done properly in parallel) or doing aggregation in a
>>> window periodically.
>>> 
>>> What we don't have at the moment is periodic
>>> aggregations on the whole stream. I would even go as
>>> far as to remove the continuous outputting
>>> reduce/aggregate it and replace it with this version
>>> as this in return can be done properly in parallel.
>>> 
>>> My suggestion would be that a call:
>>> 
>>> dataStream.reduce(..) dataStream.sum(..)
>>> 
>>> would return a windowed data stream where the window is
>>> the whole record history, and the user would need to
>>> define a trigger to get the actual reduced values
>>> like:
>>> 
>>> dataStream.reduce(...).every(Time.of(4,sec)) to get the
>>> actual reduced results. dataStream.sum(...).every(...)
>>> 
>>> I think the current data stream reduce/aggregation is
>>> very confusing without being practical for any normal
>>> use-case.
>>> 
>>> Also this would be a very api breaking change (but I
>>> would still make this change as it is much more
>>> intuitive than the current behaviour) so I would try to
>>> push it before the release if we can agree.
>>> 
>>> Cheers, Gyula
>>> 
> 
> 
 
>>> 
>> 
>> 
> 

- -- 
~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~
-BEGIN PGP SIGNATURE-
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVNkgYAAoJEKdCIJx7flKw7rUIAMmu80ZuMvfA/BvQemkEo7As
bU3iWre+e3OUWNRLuf2JfG9CHMKFSjBJG6Jax/pWZBXTYh8oaYDrYixq7e+vljqf
P9ypurhd1h8In71aSUyUPIsrTg6aJ5xo/beUxA6LFbB2LpVqawNDe0gjn3ZRMobM
zmn962kqp0oHAVipYI2mzEU6RNl1Kh0PoaLaZRLRh+dlgKofqDFcBiB3hhG/VEoF
sCsCAsC1bXtpToPRZ29cRcEfpHcnE3zCgivPeG83JsWYr4mIEj7gp+smFUz0PjoI
1wHv/pnZJS4Onk38HH1GcP95/uYpqm4gz3OBCuE7v+3b1bI852bIvnUZrCGLOew=
=u1R0
-END PGP SIGNATURE-


Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
I have opened a PR for this feature:

https://github.com/apache/flink/pull/614

Cheers,
Gyula

On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra  wrote:

> Thats a good idea, I will modify my PR to that :)
>
> Gyula
>
> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske  wrote:
>
>> Is it possible to switch the order of the statements, i.e.,
>>
>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
>> dataStream.reduce(...).every(Time.of(4,sec))
>>
>> I think that would be more consistent with the structure of the remaining
>> API.
>>
>> Cheers, Fabian
>>
>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra :
>>
>> > Hi Bruno,
>> >
>> > Of course you can do that as well. (That's the good part :p )
>> >
>> > I will open a PR soon with the proposed changes (first without breaking
>> the
>> > current Api) and I will post it here.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Tuesday, April 21, 2015, Bruno Cadonna <
>> cado...@informatik.hu-berlin.de
>> > >
>> > wrote:
>> >
>> > > -BEGIN PGP SIGNED MESSAGE-
>> > > Hash: SHA1
>> > >
>> > > Hi Gyula,
>> > >
>> > > I have a question regarding your suggestion.
>> > >
>> > > Can the current continuous aggregation be also specified with your
>> > > proposed periodic aggregation?
>> > >
>> > > I am thinking about something like
>> > >
>> > > dataStream.reduce(...).every(Count.of(1))
>> > >
>> > > Cheers,
>> > > Bruno
>> > >
>> > > On 20.04.2015 22:32, Gyula Fóra wrote:
>> > > > Hey all,
>> > > >
>> > > > I think we are missing a quite useful feature that could be
>> > > > implemented (with some slight modifications) on top of the current
>> > > > windowing api.
>> > > >
>> > > > We currently provide 2 ways of aggregating (or reducing) over
>> > > > streams: doing a continuous aggregation and always output the
>> > > > aggregated value (which cannot be done properly in parallel) or
>> > > > doing aggregation in a window periodically.
>> > > >
>> > > > What we don't have at the moment is periodic aggregations on the
>> > > > whole stream. I would even go as far as to remove the continuous
>> > > > outputting reduce/aggregate it and replace it with this version as
>> > > > this in return can be done properly in parallel.
>> > > >
>> > > > My suggestion would be that a call:
>> > > >
>> > > > dataStream.reduce(..) dataStream.sum(..)
>> > > >
>> > > > would return a windowed data stream where the window is the whole
>> > > > record history, and the user would need to define a trigger to get
>> > > > the actual reduced values like:
>> > > >
>> > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
>> > > > reduced results. dataStream.sum(...).every(...)
>> > > >
>> > > > I think the current data stream reduce/aggregation is very
>> > > > confusing without being practical for any normal use-case.
>> > > >
>> > > > Also this would be a very api breaking change (but I would still
>> > > > make this change as it is much more intuitive than the current
>> > > > behaviour) so I would try to push it before the release if we can
>> > > > agree.
>> > > >
>> > > > Cheers, Gyula
>> > > >
>> > >
>> > > - --
>> > > ~~~
>> > >
>> > >   Dr. Bruno Cadonna
>> > >   Postdoctoral Researcher
>> > >
>> > >   Databases and Information Systems
>> > >   Department of Computer Science
>> > >   Humboldt-Universität zu Berlin
>> > >
>> > >   http://www.informatik.hu-berlin.de/~cadonnab
>> > >
>> > > ~~~
>> > > -BEGIN PGP SIGNATURE-
>> > > Version: GnuPG v1.4.11 (GNU/Linux)
>> > >
>> > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
>> > > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
>> > > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
>> > > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
>> > > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
>> > > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
>> > > =8bVQ
>> > > -END PGP SIGNATURE-
>> > >
>> >
>>
>
>


Re: Akka transparency and serialisation

2015-04-21 Thread Stephan Ewen
Good point to raise Paris.

Here are the practices I (and others) have been using, they work well

1) Do not assume serialization, that is true. If you need to make sure that
the instance of the data is not shared after the message, send a manually
serialized version. The "InstantiationUtil" has methods to
serialize/deserialize into/from byte arrays.

2) If the data involves user-defined classes, always serialize manually,
because the derserialization in the akka stack will not use the required
user-code classloader. Have a look at the class "SerializedValue", which
eagerly serializes and lazily deserialized (with a given class loader) to
overcome these situations.

3) I totally agree to not make any assumptions on the behavior of transient
fields.

Stephan


On Tue, Apr 21, 2015 at 1:41 PM, Paris Carbone  wrote:

> Hello everyone,
>
>
> Many of you are already aware of this but it is good to make it clear in
> the mailist. We bumped into this "special" case with Akka several times
> already and it is important to know where transparency actually breaks.
>
>
> In short, Akka serialises only messages that get transferred over the wire
> or across JVMs [1]. Thus, we should not rely on messages getting serialised
> for anything we want to transfer using Akka. To overcome this we should
> either:
>
>
> 1) Do a deep copy of everything passed via Akka messaging
>
> 2) Apply serialisation manually before sending messages and transfer only
> pre-serialized data.
>
> 3) Never rely on transient fields
>
>
> cheers
>
> Paris
>
>
> [1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html
>
>


Akka transparency and serialisation

2015-04-21 Thread Paris Carbone
Hello everyone,


Many of you are already aware of this but it is good to make it clear in the 
mailist. We bumped into this "special" case with Akka several times already and 
it is important to know where transparency actually breaks.


In short, Akka serialises only messages that get transferred over the wire or 
across JVMs [1]. Thus, we should not rely on messages getting serialised for 
anything we want to transfer using Akka. To overcome this we should either:


1) Do a deep copy of everything passed via Akka messaging

2) Apply serialisation manually before sending messages and transfer only 
pre-serialized data.

3) Never rely on transient fields


cheers

Paris


[1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html



Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
Thats a good idea, I will modify my PR to that :)

Gyula

On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske  wrote:

> Is it possible to switch the order of the statements, i.e.,
>
> dataStream.every(Time.of(4,sec)).reduce(...) instead of
> dataStream.reduce(...).every(Time.of(4,sec))
>
> I think that would be more consistent with the structure of the remaining
> API.
>
> Cheers, Fabian
>
> 2015-04-21 10:57 GMT+02:00 Gyula Fóra :
>
> > Hi Bruno,
> >
> > Of course you can do that as well. (That's the good part :p )
> >
> > I will open a PR soon with the proposed changes (first without breaking
> the
> > current Api) and I will post it here.
> >
> > Cheers,
> > Gyula
> >
> > On Tuesday, April 21, 2015, Bruno Cadonna <
> cado...@informatik.hu-berlin.de
> > >
> > wrote:
> >
> > > -BEGIN PGP SIGNED MESSAGE-
> > > Hash: SHA1
> > >
> > > Hi Gyula,
> > >
> > > I have a question regarding your suggestion.
> > >
> > > Can the current continuous aggregation be also specified with your
> > > proposed periodic aggregation?
> > >
> > > I am thinking about something like
> > >
> > > dataStream.reduce(...).every(Count.of(1))
> > >
> > > Cheers,
> > > Bruno
> > >
> > > On 20.04.2015 22:32, Gyula Fóra wrote:
> > > > Hey all,
> > > >
> > > > I think we are missing a quite useful feature that could be
> > > > implemented (with some slight modifications) on top of the current
> > > > windowing api.
> > > >
> > > > We currently provide 2 ways of aggregating (or reducing) over
> > > > streams: doing a continuous aggregation and always output the
> > > > aggregated value (which cannot be done properly in parallel) or
> > > > doing aggregation in a window periodically.
> > > >
> > > > What we don't have at the moment is periodic aggregations on the
> > > > whole stream. I would even go as far as to remove the continuous
> > > > outputting reduce/aggregate it and replace it with this version as
> > > > this in return can be done properly in parallel.
> > > >
> > > > My suggestion would be that a call:
> > > >
> > > > dataStream.reduce(..) dataStream.sum(..)
> > > >
> > > > would return a windowed data stream where the window is the whole
> > > > record history, and the user would need to define a trigger to get
> > > > the actual reduced values like:
> > > >
> > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> > > > reduced results. dataStream.sum(...).every(...)
> > > >
> > > > I think the current data stream reduce/aggregation is very
> > > > confusing without being practical for any normal use-case.
> > > >
> > > > Also this would be a very api breaking change (but I would still
> > > > make this change as it is much more intuitive than the current
> > > > behaviour) so I would try to push it before the release if we can
> > > > agree.
> > > >
> > > > Cheers, Gyula
> > > >
> > >
> > > - --
> > > ~~~
> > >
> > >   Dr. Bruno Cadonna
> > >   Postdoctoral Researcher
> > >
> > >   Databases and Information Systems
> > >   Department of Computer Science
> > >   Humboldt-Universität zu Berlin
> > >
> > >   http://www.informatik.hu-berlin.de/~cadonnab
> > >
> > > ~~~
> > > -BEGIN PGP SIGNATURE-
> > > Version: GnuPG v1.4.11 (GNU/Linux)
> > >
> > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
> > > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
> > > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
> > > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
> > > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
> > > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
> > > =8bVQ
> > > -END PGP SIGNATURE-
> > >
> >
>


[jira] [Created] (FLINK-1918) NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment

2015-04-21 Thread JIRA
Zoltán Zvara created FLINK-1918:
---

 Summary: NullPointerException at 
org.apache.flink.client.program.Client's constructor while using 
ExecutionEnvironment.createRemoteEnvironment
 Key: FLINK-1918
 URL: https://issues.apache.org/jira/browse/FLINK-1918
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Reporter: Zoltán Zvara


Trace:

{code}
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.client.program.Client.(Client.java:104)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:86)
at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
at Wordcount.main(Wordcount.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
{code}

The constructor is trying to set configuration parameter 
{{jobmanager.rpc.address}} with 
{{jobManagerAddress.getAddress().getHostAddress()}}, but 
{{jobManagerAddress.holder.addr}} is {{null}}. 
{{jobManagerAddress.holder.hostname}} and {{port}} holds the valid information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1917) EOFException when running delta-iteration job

2015-04-21 Thread Stefan Bunk (JIRA)
Stefan Bunk created FLINK-1917:
--

 Summary: EOFException when running delta-iteration job
 Key: FLINK-1917
 URL: https://issues.apache.org/jira/browse/FLINK-1917
 Project: Flink
  Issue Type: Bug
  Components: Core, Distributed Runtime, Iterations
 Environment: 0.9-milestone-1
Exception on the cluster, local execution works
Reporter: Stefan Bunk


The delta-iteration program in [1] ends with an

java.io.EOFException
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

For logs and the accompanying mailing list discussion see below.

When running with slightly different memory configuration, as hinted on the 
mailing list, I sometimes also get this exception:

19.Apr. 13:39:29 INFO  Task - IterationHead(WorksetIteration 
(Resolved-Redirects)) (10/10) switched to FAILED : 
java.lang.IndexOutOfBoundsException: Index: 161, Size: 161
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

[1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57
[2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4
[3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc
[4] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1916) EOFException when running delta-iteration job

2015-04-21 Thread Stefan Bunk (JIRA)
Stefan Bunk created FLINK-1916:
--

 Summary: EOFException when running delta-iteration job
 Key: FLINK-1916
 URL: https://issues.apache.org/jira/browse/FLINK-1916
 Project: Flink
  Issue Type: Bug
 Environment: 0.9-milestone-1
Exception on the cluster, local execution works
Reporter: Stefan Bunk


The delta-iteration program in [1] ends with an

java.io.EOFException
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

For logs and the accompanying mailing list discussion see below.

When running with slightly different memory configuration, as hinted on the 
mailing list, I sometimes also get this exception:

19.Apr. 13:39:29 INFO  Task - IterationHead(WorksetIteration 
(Resolved-Redirects)) (10/10) switched to FAILED : 
java.lang.IndexOutOfBoundsException: Index: 161, Size: 161
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

[1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57
[2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4
[3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc
[4] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: About Operator and OperatorBase

2015-04-21 Thread Stephan Ewen
Originally, we had multiple Apis with different data models: the current
Java API, the record api, a JSON API. The common API was the data model
agnostic set of operators on which they built.

It has become redundant when we saw how well things can be built in top of
the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all
build on top of the Java API.


Re: Periodic full stream aggregations

2015-04-21 Thread Fabian Hueske
Is it possible to switch the order of the statements, i.e.,

dataStream.every(Time.of(4,sec)).reduce(...) instead of
dataStream.reduce(...).every(Time.of(4,sec))

I think that would be more consistent with the structure of the remaining
API.

Cheers, Fabian

2015-04-21 10:57 GMT+02:00 Gyula Fóra :

> Hi Bruno,
>
> Of course you can do that as well. (That's the good part :p )
>
> I will open a PR soon with the proposed changes (first without breaking the
> current Api) and I will post it here.
>
> Cheers,
> Gyula
>
> On Tuesday, April 21, 2015, Bruno Cadonna  >
> wrote:
>
> > -BEGIN PGP SIGNED MESSAGE-
> > Hash: SHA1
> >
> > Hi Gyula,
> >
> > I have a question regarding your suggestion.
> >
> > Can the current continuous aggregation be also specified with your
> > proposed periodic aggregation?
> >
> > I am thinking about something like
> >
> > dataStream.reduce(...).every(Count.of(1))
> >
> > Cheers,
> > Bruno
> >
> > On 20.04.2015 22:32, Gyula Fóra wrote:
> > > Hey all,
> > >
> > > I think we are missing a quite useful feature that could be
> > > implemented (with some slight modifications) on top of the current
> > > windowing api.
> > >
> > > We currently provide 2 ways of aggregating (or reducing) over
> > > streams: doing a continuous aggregation and always output the
> > > aggregated value (which cannot be done properly in parallel) or
> > > doing aggregation in a window periodically.
> > >
> > > What we don't have at the moment is periodic aggregations on the
> > > whole stream. I would even go as far as to remove the continuous
> > > outputting reduce/aggregate it and replace it with this version as
> > > this in return can be done properly in parallel.
> > >
> > > My suggestion would be that a call:
> > >
> > > dataStream.reduce(..) dataStream.sum(..)
> > >
> > > would return a windowed data stream where the window is the whole
> > > record history, and the user would need to define a trigger to get
> > > the actual reduced values like:
> > >
> > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> > > reduced results. dataStream.sum(...).every(...)
> > >
> > > I think the current data stream reduce/aggregation is very
> > > confusing without being practical for any normal use-case.
> > >
> > > Also this would be a very api breaking change (but I would still
> > > make this change as it is much more intuitive than the current
> > > behaviour) so I would try to push it before the release if we can
> > > agree.
> > >
> > > Cheers, Gyula
> > >
> >
> > - --
> > ~~~
> >
> >   Dr. Bruno Cadonna
> >   Postdoctoral Researcher
> >
> >   Databases and Information Systems
> >   Department of Computer Science
> >   Humboldt-Universität zu Berlin
> >
> >   http://www.informatik.hu-berlin.de/~cadonnab
> >
> > ~~~
> > -BEGIN PGP SIGNATURE-
> > Version: GnuPG v1.4.11 (GNU/Linux)
> >
> > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
> > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
> > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
> > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
> > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
> > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
> > =8bVQ
> > -END PGP SIGNATURE-
> >
>


Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
Hi Bruno,

Of course you can do that as well. (That's the good part :p )

I will open a PR soon with the proposed changes (first without breaking the
current Api) and I will post it here.

Cheers,
Gyula

On Tuesday, April 21, 2015, Bruno Cadonna 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA1
>
> Hi Gyula,
>
> I have a question regarding your suggestion.
>
> Can the current continuous aggregation be also specified with your
> proposed periodic aggregation?
>
> I am thinking about something like
>
> dataStream.reduce(...).every(Count.of(1))
>
> Cheers,
> Bruno
>
> On 20.04.2015 22:32, Gyula Fóra wrote:
> > Hey all,
> >
> > I think we are missing a quite useful feature that could be
> > implemented (with some slight modifications) on top of the current
> > windowing api.
> >
> > We currently provide 2 ways of aggregating (or reducing) over
> > streams: doing a continuous aggregation and always output the
> > aggregated value (which cannot be done properly in parallel) or
> > doing aggregation in a window periodically.
> >
> > What we don't have at the moment is periodic aggregations on the
> > whole stream. I would even go as far as to remove the continuous
> > outputting reduce/aggregate it and replace it with this version as
> > this in return can be done properly in parallel.
> >
> > My suggestion would be that a call:
> >
> > dataStream.reduce(..) dataStream.sum(..)
> >
> > would return a windowed data stream where the window is the whole
> > record history, and the user would need to define a trigger to get
> > the actual reduced values like:
> >
> > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> > reduced results. dataStream.sum(...).every(...)
> >
> > I think the current data stream reduce/aggregation is very
> > confusing without being practical for any normal use-case.
> >
> > Also this would be a very api breaking change (but I would still
> > make this change as it is much more intuitive than the current
> > behaviour) so I would try to push it before the release if we can
> > agree.
> >
> > Cheers, Gyula
> >
>
> - --
> ~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~
> -BEGIN PGP SIGNATURE-
> Version: GnuPG v1.4.11 (GNU/Linux)
>
> iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
> PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
> g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
> R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
> OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
> gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
> =8bVQ
> -END PGP SIGNATURE-
>


Re: Periodic full stream aggregations

2015-04-21 Thread Bruno Cadonna
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi Gyula,

I have a question regarding your suggestion.

Can the current continuous aggregation be also specified with your
proposed periodic aggregation?

I am thinking about something like

dataStream.reduce(...).every(Count.of(1))

Cheers,
Bruno

On 20.04.2015 22:32, Gyula Fóra wrote:
> Hey all,
> 
> I think we are missing a quite useful feature that could be
> implemented (with some slight modifications) on top of the current
> windowing api.
> 
> We currently provide 2 ways of aggregating (or reducing) over
> streams: doing a continuous aggregation and always output the
> aggregated value (which cannot be done properly in parallel) or
> doing aggregation in a window periodically.
> 
> What we don't have at the moment is periodic aggregations on the
> whole stream. I would even go as far as to remove the continuous
> outputting reduce/aggregate it and replace it with this version as
> this in return can be done properly in parallel.
> 
> My suggestion would be that a call:
> 
> dataStream.reduce(..) dataStream.sum(..)
> 
> would return a windowed data stream where the window is the whole
> record history, and the user would need to define a trigger to get
> the actual reduced values like:
> 
> dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> reduced results. dataStream.sum(...).every(...)
> 
> I think the current data stream reduce/aggregation is very
> confusing without being practical for any normal use-case.
> 
> Also this would be a very api breaking change (but I would still
> make this change as it is much more intuitive than the current
> behaviour) so I would try to push it before the release if we can
> agree.
> 
> Cheers, Gyula
> 

- -- 
~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~
-BEGIN PGP SIGNATURE-
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
=8bVQ
-END PGP SIGNATURE-


Fault Tolerance for Flink Iterations

2015-04-21 Thread Markus Holzemer
Hi everybody,

I am writing my master thesis about making flink iterations / iterative
flink algorithms fault tolerant.
The first approach I implemented is a basic checkpointing, where every N
iterations the current state is saved into HDFS.
To do this I enabled data sinks inside of iterations, then attached a new
checkpointing sink to the beginning of each iteration. To recover from a
previous checkpoint I cancel all tasks, add a new datasource in front of
the iteration and reschedule the tasks with lower dop. I do this out of the
JobManager during runtime without starting a new job.
The problem is that sometimes the input data to the iteration has some
properties like a certain partitioning or sorting, and I am struggeling
with reconstructing theses properties from the checkpoint source.
I figured that an easier way to do this is to re-optimize the new plan
(with the new source as input to the iteration) before the rescheduling.
But in the current project structure flink-runtime has no access to
flink-optimizer and it would be a major design break to change this.
Has somebody any advice on this?

best,
Markus