Slinding Window Join (without duplicates)

2015-11-23 Thread Matthias J. Sax
Hi,

it seems that a join on the data streams with an overlapping sliding
window produces duplicates in the output. The default implementation
internally just use two nested-loops over both windows to compute the
result.

How can duplicates be avoided? Is there any way after all right now? If
not, should be add this?


-Matthias



signature.asc
Description: OpenPGP digital signature


Re: Either left() vs left(value)

2015-11-23 Thread Vasiliki Kalavri
Hey Gyula,

I don't think dropping the method is a good idea. We need a way to retrieve
left and right values, no?
How about renaming to getLeft() / getRight()?

-V.

On 23 November 2015 at 09:55, Gyula Fóra  wrote:

> Hey guys,
>
> I know this should have been part of the PR discussion but it kind of
> slipped through the cracks :)
>
> I think it might be useful to change the method name for Either.left(value)
> to Either.Left(value) (or drop the method completely).
>
> The reason is that it is slightly awkward to use it with java 8 lambdas.
> You cannot use Either::left because of the name clash. Maybe it's not a
> huge issue but a small inconvenience that will come up more often as we are
> gradually moving to java 8 anyways :)
>
> What do you think?
> Gyula
>


Re: Either left() vs left(value)

2015-11-23 Thread Gyula Fóra
I was actually not suggesting to drop the e.left() method but instead the
Either.left(val).
Renaming the left(), right() methods might be confusing as than it would be
inconsistent with the scala version.

On the other hand we could change the way the user can create the Left
Right classes, maybe directly expose them instead of the static method. (or
rename the static method)

Gyula

Vasiliki Kalavri  ezt írta (időpont: 2015. nov.
23., H, 20:14):

> Hey Gyula,
>
> I don't think dropping the method is a good idea. We need a way to retrieve
> left and right values, no?
> How about renaming to getLeft() / getRight()?
>
> -V.
>
> On 23 November 2015 at 09:55, Gyula Fóra  wrote:
>
> > Hey guys,
> >
> > I know this should have been part of the PR discussion but it kind of
> > slipped through the cracks :)
> >
> > I think it might be useful to change the method name for
> Either.left(value)
> > to Either.Left(value) (or drop the method completely).
> >
> > The reason is that it is slightly awkward to use it with java 8 lambdas.
> > You cannot use Either::left because of the name clash. Maybe it's not a
> > huge issue but a small inconvenience that will come up more often as we
> are
> > gradually moving to java 8 anyways :)
> >
> > What do you think?
> > Gyula
> >
>


Re: Either left() vs left(value)

2015-11-23 Thread Vasiliki Kalavri
Ah I see. Well, as I also said in the PR, Left and Right make no sense on
their own, they're helper classes for Either. Hence, I believe they should
be private. Maybe we could rename the methods to createLeft() /
createRight() ?

On 23 November 2015 at 20:58, Gyula Fóra  wrote:

> I was actually not suggesting to drop the e.left() method but instead the
> Either.left(val).
> Renaming the left(), right() methods might be confusing as than it would be
> inconsistent with the scala version.
>
> On the other hand we could change the way the user can create the Left
> Right classes, maybe directly expose them instead of the static method. (or
> rename the static method)
>
> Gyula
>
> Vasiliki Kalavri  ezt írta (időpont: 2015. nov.
> 23., H, 20:14):
>
> > Hey Gyula,
> >
> > I don't think dropping the method is a good idea. We need a way to
> retrieve
> > left and right values, no?
> > How about renaming to getLeft() / getRight()?
> >
> > -V.
> >
> > On 23 November 2015 at 09:55, Gyula Fóra  wrote:
> >
> > > Hey guys,
> > >
> > > I know this should have been part of the PR discussion but it kind of
> > > slipped through the cracks :)
> > >
> > > I think it might be useful to change the method name for
> > Either.left(value)
> > > to Either.Left(value) (or drop the method completely).
> > >
> > > The reason is that it is slightly awkward to use it with java 8
> lambdas.
> > > You cannot use Either::left because of the name clash. Maybe it's not a
> > > huge issue but a small inconvenience that will come up more often as we
> > are
> > > gradually moving to java 8 anyways :)
> > >
> > > What do you think?
> > > Gyula
> > >
> >
>


Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Gyula Fóra
Yes, you are right I think we should have some nice abstractions for doing
this.

Before the rewrite of the windowing runtime to support out-of-order events,
 we had abstractions for supporting this but that code was not feasible
from performance perspective.  (The result of a keyed window reduce used to
be a window containing all the aggregates and one could then just aggregate
again on the result without specifying the window again)

Maybe we could implement similar abstractions on the new window runtime, I
think that would be really awesome.

Gyula

Konstantin Knauf  ezt írta (időpont: 2015.
nov. 23., H, 11:40):

> Thanks!
>
> @Fabian: Yepp, but this still results in multiple outputs per window,
> because the maximum is emitted for every key.
>
> @Gyula: Yepp, that's the second bullet point from my question ;) The way
> I implemented it, it basically doubles the latency, because the
> timeWindowAll has to wait for the next timeWindow before it can close
> the previous one. So if the first timeWindow is 10s, it takes 20s until
> you have a result, although it cant change after 10s. You know what I mean?
>
> Cheers,
>
> Konstantin
>
> On 23.11.2015 11:32, Gyula Fóra wrote:
> > Hi,
> >
> > Alright it seems there are multiple ways of doing this.
> >
> > I would do something like:
> >
> > ds.keyBy(key)
> > .timeWindow(w)
> > .reduce(...)
> > .timeWindowAll(w)
> > .reduce(...)
> >
> > Maybe Aljoscha could jump in here :D
> >
> > Cheers,
> > Gyula
> >
> > Fabian Hueske > ezt írta
> > (időpont: 2015. nov. 23., H, 11:21):
> >
> > If you set the key to the time attribute, the "old" key is no longer
> > valid.
> > The streams are organized by time and only one aggregate for each
> > window-time should be computed.
> >
> > This should do what you are looking for:
> >
> > DataStream
> >   .keyBy(_._1) // key by orginal key
> >   .timeWindow(..)
> >   .apply(...)  // extract window end time: (origKey, time, agg)
> >   .keyBy(_._2) // key by time field
> >   .maxBy(_._3) // value with max agg field
> >
> > Best, Fabian
> >
> > 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> >  >>:
> >
> > Hi Fabian,
> >
> > thanks for your answer. Yes, that's what I want.
> >
> > The solution you suggest is what I am doing right now (see last
> > of the
> > bullet point in my question).
> >
> > But given your example. I would expect the following output:
> >
> > (key: 1, w-time: 10, agg: 17)
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> >
> > Because the reduce function is evaluated for every incoming
> > event (i.e.
> > each key), right?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 23.11.2015 10:47, Fabian Hueske wrote:
> > > Hi Konstantin,
> > >
> > > let me first summarize to make sure I understood what you are
> looking for.
> > > You computed an aggregate over a keyed event-time window and
> you are
> > > looking for the maximum aggregate for each group of windows
> over the
> > > same period of time.
> > > So if you have
> > > (key: 1, w-time: 10, agg: 17)
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 2, w-time: 20, agg: 28)
> > > (key: 3, w-time: 20, agg: 5)
> > >
> > > you would like to get:
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > >
> > > If this is correct, you can do this as follows.
> > > You can extract the window start and end time from the
> TimeWindow
> > > parameter of the WindowFunction and key the stream either by
> start or
> > > end time and apply a ReduceFunction on the keyed stream.
> > >
> > > Best, Fabian
> > >
> > > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <
> konstantin.kn...@tngtech.com 
> > >  > >>:
> > >
> > > Hi everyone,
> > >
> > > me again :) Let's say you have a stream, and for every
> > window and key
> > > you compute some aggregate value, like this:
> > >
> > > DataStream.keyBy(..)
> > >   .timeWindow(..)
> > >   .apply(...)
> > >
> > >
> > > Now I want to get the maximum aggregate value for every
> > window over the
> > > keys. This feels like a pretty natural use case. How can I
> > achieve this
> > > 

Either left() vs left(value)

2015-11-23 Thread Gyula Fóra
Hey guys,

I know this should have been part of the PR discussion but it kind of
slipped through the cracks :)

I think it might be useful to change the method name for Either.left(value)
to Either.Left(value) (or drop the method completely).

The reason is that it is slightly awkward to use it with java 8 lambdas.
You cannot use Either::left because of the name clash. Maybe it's not a
huge issue but a small inconvenience that will come up more often as we are
gradually moving to java 8 anyways :)

What do you think?
Gyula


Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Aljoscha Krettek
Hi,
@Konstantin: are you using event-time or processing-time windows. If you are 
using processing time, then you can only do it the way Fabian suggested. The 
problem here is, however, that the .keyBy().reduce() combination would emit a 
new maximum for every element that arrives there and you never know when you 
saw the final element, i.e. the maximum.

If you are using event-time, then you are indeed lucky because then you can use 
what Gyula suggested and you won’t have latency, if I’m correct. The reason is 
that the watermark that flushes out the windows in the first (keyed window) 
will also flush out the elements in the all-window. So the keyed window will do 
computations, send along the elements and then after it is done it will forward 
the watermark. This watermark will immediately trigger computation of the 
all-window for the same time period.

Cheers,
Aljoscha
> On 23 Nov 2015, at 11:51, Gyula Fóra  wrote:
> 
> Yes, you are right I think we should have some nice abstractions for doing 
> this. 
> 
> Before the rewrite of the windowing runtime to support out-of-order events,  
> we had abstractions for supporting this but that code was not feasible from 
> performance perspective.  (The result of a keyed window reduce used to be a 
> window containing all the aggregates and one could then just aggregate again 
> on the result without specifying the window again)
> 
> Maybe we could implement similar abstractions on the new window runtime, I 
> think that would be really awesome.
> 
> Gyula
> 
> Konstantin Knauf  ezt írta (időpont: 2015. nov. 
> 23., H, 11:40):
> Thanks!
> 
> @Fabian: Yepp, but this still results in multiple outputs per window,
> because the maximum is emitted for every key.
> 
> @Gyula: Yepp, that's the second bullet point from my question ;) The way
> I implemented it, it basically doubles the latency, because the
> timeWindowAll has to wait for the next timeWindow before it can close
> the previous one. So if the first timeWindow is 10s, it takes 20s until
> you have a result, although it cant change after 10s. You know what I mean?
> 
> Cheers,
> 
> Konstantin
> 
> On 23.11.2015 11:32, Gyula Fóra wrote:
> > Hi,
> >
> > Alright it seems there are multiple ways of doing this.
> >
> > I would do something like:
> >
> > ds.keyBy(key)
> > .timeWindow(w)
> > .reduce(...)
> > .timeWindowAll(w)
> > .reduce(...)
> >
> > Maybe Aljoscha could jump in here :D
> >
> > Cheers,
> > Gyula
> >
> > Fabian Hueske > ezt írta
> > (időpont: 2015. nov. 23., H, 11:21):
> >
> > If you set the key to the time attribute, the "old" key is no longer
> > valid.
> > The streams are organized by time and only one aggregate for each
> > window-time should be computed.
> >
> > This should do what you are looking for:
> >
> > DataStream
> >   .keyBy(_._1) // key by orginal key
> >   .timeWindow(..)
> >   .apply(...)  // extract window end time: (origKey, time, agg)
> >   .keyBy(_._2) // key by time field
> >   .maxBy(_._3) // value with max agg field
> >
> > Best, Fabian
> >
> > 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> > >:
> >
> > Hi Fabian,
> >
> > thanks for your answer. Yes, that's what I want.
> >
> > The solution you suggest is what I am doing right now (see last
> > of the
> > bullet point in my question).
> >
> > But given your example. I would expect the following output:
> >
> > (key: 1, w-time: 10, agg: 17)
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> >
> > Because the reduce function is evaluated for every incoming
> > event (i.e.
> > each key), right?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 23.11.2015 10:47, Fabian Hueske wrote:
> > > Hi Konstantin,
> > >
> > > let me first summarize to make sure I understood what you are 
> > looking for.
> > > You computed an aggregate over a keyed event-time window and you 
> > are
> > > looking for the maximum aggregate for each group of windows over 
> > the
> > > same period of time.
> > > So if you have
> > > (key: 1, w-time: 10, agg: 17)
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 2, w-time: 20, agg: 28)
> > > (key: 3, w-time: 20, agg: 5)
> > >
> > > you would like to get:
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > >
> > > If this is correct, you can do this as follows.
> > > You can extract the window start and end time from the TimeWindow
> > > parameter of the 

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Stephan Ewen
One addition: You can set the system to use "ingestion time", which gives
you event time with auto-generated timestamps and watermarks, based on the
time that the events are seen in the sources.

That way you have the same simplicity as processing time, and you get the
window alignment that Aljoscha described (second total max window has the
same elements as initial max-per-key window).

On Mon, Nov 23, 2015 at 12:49 PM, Aljoscha Krettek 
wrote:

> Hi,
> @Konstantin: are you using event-time or processing-time windows. If you
> are using processing time, then you can only do it the way Fabian
> suggested. The problem here is, however, that the .keyBy().reduce()
> combination would emit a new maximum for every element that arrives there
> and you never know when you saw the final element, i.e. the maximum.
>
> If you are using event-time, then you are indeed lucky because then you
> can use what Gyula suggested and you won’t have latency, if I’m correct.
> The reason is that the watermark that flushes out the windows in the first
> (keyed window) will also flush out the elements in the all-window. So the
> keyed window will do computations, send along the elements and then after
> it is done it will forward the watermark. This watermark will immediately
> trigger computation of the all-window for the same time period.
>
> Cheers,
> Aljoscha
> > On 23 Nov 2015, at 11:51, Gyula Fóra  wrote:
> >
> > Yes, you are right I think we should have some nice abstractions for
> doing this.
> >
> > Before the rewrite of the windowing runtime to support out-of-order
> events,  we had abstractions for supporting this but that code was not
> feasible from performance perspective.  (The result of a keyed window
> reduce used to be a window containing all the aggregates and one could then
> just aggregate again on the result without specifying the window again)
> >
> > Maybe we could implement similar abstractions on the new window runtime,
> I think that would be really awesome.
> >
> > Gyula
> >
> > Konstantin Knauf  ezt írta (időpont:
> 2015. nov. 23., H, 11:40):
> > Thanks!
> >
> > @Fabian: Yepp, but this still results in multiple outputs per window,
> > because the maximum is emitted for every key.
> >
> > @Gyula: Yepp, that's the second bullet point from my question ;) The way
> > I implemented it, it basically doubles the latency, because the
> > timeWindowAll has to wait for the next timeWindow before it can close
> > the previous one. So if the first timeWindow is 10s, it takes 20s until
> > you have a result, although it cant change after 10s. You know what I
> mean?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 23.11.2015 11:32, Gyula Fóra wrote:
> > > Hi,
> > >
> > > Alright it seems there are multiple ways of doing this.
> > >
> > > I would do something like:
> > >
> > > ds.keyBy(key)
> > > .timeWindow(w)
> > > .reduce(...)
> > > .timeWindowAll(w)
> > > .reduce(...)
> > >
> > > Maybe Aljoscha could jump in here :D
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Fabian Hueske > ezt írta
> > > (időpont: 2015. nov. 23., H, 11:21):
> > >
> > > If you set the key to the time attribute, the "old" key is no
> longer
> > > valid.
> > > The streams are organized by time and only one aggregate for each
> > > window-time should be computed.
> > >
> > > This should do what you are looking for:
> > >
> > > DataStream
> > >   .keyBy(_._1) // key by orginal key
> > >   .timeWindow(..)
> > >   .apply(...)  // extract window end time: (origKey, time, agg)
> > >   .keyBy(_._2) // key by time field
> > >   .maxBy(_._3) // value with max agg field
> > >
> > > Best, Fabian
> > >
> > > 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> > >  >>:
> > >
> > > Hi Fabian,
> > >
> > > thanks for your answer. Yes, that's what I want.
> > >
> > > The solution you suggest is what I am doing right now (see last
> > > of the
> > > bullet point in my question).
> > >
> > > But given your example. I would expect the following output:
> > >
> > > (key: 1, w-time: 10, agg: 17)
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 1, w-time: 20, agg: 30)
> > >
> > > Because the reduce function is evaluated for every incoming
> > > event (i.e.
> > > each key), right?
> > >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > > On 23.11.2015 10:47, Fabian Hueske wrote:
> > > > Hi Konstantin,
> > > >
> > > > let me first summarize to make sure I understood what you
> are looking for.
> > > > You computed an aggregate over a keyed event-time window and
> you are
> > > > looking for the maximum 

[VOTE] Release Apache Flink 0.10.1 (release-0.10.0-rc1)

2015-11-23 Thread Robert Metzger
Hi All,

this is the first bugfix release for the 0.10 series of Flink.
I've CC'ed the user@ list if users are interested in helping to verify the
release.

It contains fixes for critical issues, in particular:
- FLINK-3021 Fix class loading issue for streaming sources
- FLINK-2974 Add periodic offset committer for Kafka
- FLINK-2977 Using reflection to load HBase Kerberos tokens
- FLINK-3024 Fix TimestampExtractor.getCurrentWatermark() Behaviour
- FLINK-2967 Increase timeout for LOCAL_HOST address detection stratey
- FLINK-3025 [kafka consumer] Bump transitive ZkClient dependency
- FLINK-2989 job cancel button doesn't work on YARN
- FLINK-3032: Flink does not start on Hadoop 2.7.1 (HDP), due to class
conflict
- FLINK-3011, 3019, 3028 Cancel jobs in RESTARTING state

This is the guide on how to verify a release:
https://cwiki.apache.org/confluence/display/FLINK/Releasing

During the testing, please focus on trying out Flink on different Hadoop
platforms: We changed the way how Hadoop's Maven dependencies are packaged,
so maybe there are issues with different Hadoop distributions.
The Kafka consumer also changed a bit, would be good to test it on a
cluster.

-

Please vote on releasing the following candidate as Apache Flink version
0.10.1:

The commit to be voted on:
http://git-wip-us.apache.org/repos/asf/flink/commit/2e9b2316

Branch:
release-0.10.1-rc1 (see
https://git1-us-west.apache.org/repos/asf/flink/?p=flink.git)

The release artifacts to be voted on can be found at:
http://people.apache.org/~rmetzger/flink-0.10.1-rc1/

The release artifacts are signed with the key with fingerprint  D9839159:
http://www.apache.org/dist/flink/KEYS

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapacheflink-1058

-

The vote is open for the next 72 hours and passes if a majority of at least
three +1 PMC votes are cast.

The vote ends on Wednesday, November 25.

[ ] +1 Release this package as Apache Flink 0.10.1
[ ] -1 Do not release this package because ...

===


Re: [DISCUSS] Release Flink 0.10.1 soon

2015-11-23 Thread Maximilian Michels
Thanks for being the release manager. I promise the release script works
like a charm :)

On Sun, Nov 22, 2015 at 12:30 PM, Robert Metzger 
wrote:

> It seems that we have merged all critical fixes into the release-0.10
> branch.
> Since nobody else stepped up as a release manager, I'll do it again.
> It has been a while Lets see how the scripts have evolved ;)
>
> On Fri, Nov 20, 2015 at 2:21 PM, Matthias J. Sax  wrote:
>
> > Nice.
> >
> > I just would need to get some feedback about it -- I had to change
> > something in a "hacky way"... Maybe there is a better solution for it...
> >
> > https://github.com/apache/flink/pull/1387
> >
> > I there is no better idea about solving the naming issue, I would merge
> > it into master (there is no 0.10.1 branch yet, right?)
> >
> > -Matthias
> >
> > On 11/20/2015 01:54 PM, Till Rohrmann wrote:
> > > If it' not API breaking, then it can be included imo.
> > >
> > > On Fri, Nov 20, 2015 at 1:44 PM, Matthias J. Sax 
> > wrote:
> > >
> > >> If we find more bugs later on, we could have a 0.10.2, too.
> > >>
> > >> +1 for quick bug fix release.
> > >>
> > >> Question: should bug fix releases contain fixes for core components
> > >> only? I would have a fix for a bug in Storm compatibility -- not sure
> if
> > >> it should be included or not
> > >>
> > >> -Matthias
> > >>
> > >> On 11/20/2015 12:35 PM, Till Rohrmann wrote:
> > >>> The optimizer bug (https://issues.apache.org/jira/browse/FLINK-3052)
> > >> should
> > >>> be fixed with https://github.com/apache/flink/pull/1388.
> > >>>
> > >>> On Fri, Nov 20, 2015 at 11:37 AM, Gyula Fóra 
> > >> wrote:
> > >>>
> >  Thanks guys,
> > 
> >  I understand your point and you are probably right, if this is a
> >  lightweight process than the earlier the better :)
> > 
> >  Gyula
> >  On Fri, Nov 20, 2015 at 11:34 AM Ufuk Celebi 
> wrote:
> > 
> > > Hey Gyula,
> > >
> > > I understand your point, but we already have some important fixes
> for
> > > 0.10.1. It's fair to assume that we will find more issues in the
> > >> future,
> > > but the bugfix releases have way less overhead than the major
> > >> releases. I
> > > would still keep the ASAP schedule and would not wait longer
> (except
> > >> for
> > > the case that the we find an important issue that we want fixed).
> We
> > >> can
> > > always do a new bug fix release.
> > >
> > > – Ufuk
> > >
> > > On Fri, Nov 20, 2015 at 11:25 AM, Gyula Fóra  >
> >  wrote:
> > >
> > >> Hi all,
> > >>
> > >> Wouldnt you think that it would make sense to wait a week or so to
> > >> find
> > > all
> > >> the hot issues with the current release?
> > >>
> > >> To me it feels a little bit like rushing this out and we will have
> >  almost
> > >> the same situation afterwards.
> > >>
> > >> I might be wrong but I think people should get a chance to try
> this
> >  out.
> > >>
> > >> In any case I would +1 for the quick release if everyone else
> thinks
> > > thats
> > >> the way, these are just my thoughts.
> > >>
> > >> Gyula
> > >> On Fri, Nov 20, 2015 at 11:13 AM Till Rohrmann <
> > trohrm...@apache.org>
> > >> wrote:
> > >>
> > >>> Actually, I still have another bug related to the optimizer
> which I
> > > would
> > >>> like to include if possible. The problem is that the optimizer is
> > not
> > >> able
> > >>> to push properties properly out of a bulk iteration which in some
> >  cases
> > >> can
> > >>> lead to rejected Flink jobs.
> > >>>
> > >>> On Fri, Nov 20, 2015 at 11:10 AM, Robert Metzger <
> >  rmetz...@apache.org>
> > >>> wrote:
> > >>>
> >  Great, thank you!
> > 
> >  Let me know if there is any issue, I'll address it asap. The PR
> is
> > > not
> >  building anymore because you've pushed an update to the Kafka
> >  documentation. I can rebase and merge the PR once you give me
> > green
> > >> light
> >  ;)
> > 
> >  Till has merged FLINK-3021, so we might be able to have a first
> RC
> > >> today.
> > 
> > 
> >  On Fri, Nov 20, 2015 at 11:05 AM, Stephan Ewen <
> se...@apache.org>
> > >> wrote:
> > 
> > > Let me look at FLINK-2974 (open PR) to see if it can be
> merged...
> > >
> > > On Thu, Nov 19, 2015 at 10:09 PM, Robert Metzger <
> > >> rmetz...@apache.org>
> > > wrote:
> > >
> > >> Looks like we didn't manage to merge everything today.
> > >>
> > >> (pending PRs)
> > >> - FLINK-3021 Fix class loading issue for streaming sources
> > >> - FLINK-2974 Add periodic offset committer for Kafka
> > >>
> > >> (merged)
> > >> - 

Re: [DISCUSS] Release Flink 0.10.1 soon

2015-11-23 Thread Robert Metzger
I can confirm that.
Sadly, the repository.apache.org server is not as cooperative as I hoped. I
have everything ready for the release, except the staging repository.
I hope to get it done in the next two hours.

On Mon, Nov 23, 2015 at 10:16 AM, Maximilian Michels  wrote:

> Thanks for being the release manager. I promise the release script works
> like a charm :)
>
> On Sun, Nov 22, 2015 at 12:30 PM, Robert Metzger 
> wrote:
>
> > It seems that we have merged all critical fixes into the release-0.10
> > branch.
> > Since nobody else stepped up as a release manager, I'll do it again.
> > It has been a while Lets see how the scripts have evolved ;)
> >
> > On Fri, Nov 20, 2015 at 2:21 PM, Matthias J. Sax 
> wrote:
> >
> > > Nice.
> > >
> > > I just would need to get some feedback about it -- I had to change
> > > something in a "hacky way"... Maybe there is a better solution for
> it...
> > >
> > > https://github.com/apache/flink/pull/1387
> > >
> > > I there is no better idea about solving the naming issue, I would merge
> > > it into master (there is no 0.10.1 branch yet, right?)
> > >
> > > -Matthias
> > >
> > > On 11/20/2015 01:54 PM, Till Rohrmann wrote:
> > > > If it' not API breaking, then it can be included imo.
> > > >
> > > > On Fri, Nov 20, 2015 at 1:44 PM, Matthias J. Sax 
> > > wrote:
> > > >
> > > >> If we find more bugs later on, we could have a 0.10.2, too.
> > > >>
> > > >> +1 for quick bug fix release.
> > > >>
> > > >> Question: should bug fix releases contain fixes for core components
> > > >> only? I would have a fix for a bug in Storm compatibility -- not
> sure
> > if
> > > >> it should be included or not
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 11/20/2015 12:35 PM, Till Rohrmann wrote:
> > > >>> The optimizer bug (
> https://issues.apache.org/jira/browse/FLINK-3052)
> > > >> should
> > > >>> be fixed with https://github.com/apache/flink/pull/1388.
> > > >>>
> > > >>> On Fri, Nov 20, 2015 at 11:37 AM, Gyula Fóra  >
> > > >> wrote:
> > > >>>
> > >  Thanks guys,
> > > 
> > >  I understand your point and you are probably right, if this is a
> > >  lightweight process than the earlier the better :)
> > > 
> > >  Gyula
> > >  On Fri, Nov 20, 2015 at 11:34 AM Ufuk Celebi 
> > wrote:
> > > 
> > > > Hey Gyula,
> > > >
> > > > I understand your point, but we already have some important fixes
> > for
> > > > 0.10.1. It's fair to assume that we will find more issues in the
> > > >> future,
> > > > but the bugfix releases have way less overhead than the major
> > > >> releases. I
> > > > would still keep the ASAP schedule and would not wait longer
> > (except
> > > >> for
> > > > the case that the we find an important issue that we want fixed).
> > We
> > > >> can
> > > > always do a new bug fix release.
> > > >
> > > > – Ufuk
> > > >
> > > > On Fri, Nov 20, 2015 at 11:25 AM, Gyula Fóra <
> gyula.f...@gmail.com
> > >
> > >  wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Wouldnt you think that it would make sense to wait a week or so
> to
> > > >> find
> > > > all
> > > >> the hot issues with the current release?
> > > >>
> > > >> To me it feels a little bit like rushing this out and we will
> have
> > >  almost
> > > >> the same situation afterwards.
> > > >>
> > > >> I might be wrong but I think people should get a chance to try
> > this
> > >  out.
> > > >>
> > > >> In any case I would +1 for the quick release if everyone else
> > thinks
> > > > thats
> > > >> the way, these are just my thoughts.
> > > >>
> > > >> Gyula
> > > >> On Fri, Nov 20, 2015 at 11:13 AM Till Rohrmann <
> > > trohrm...@apache.org>
> > > >> wrote:
> > > >>
> > > >>> Actually, I still have another bug related to the optimizer
> > which I
> > > > would
> > > >>> like to include if possible. The problem is that the optimizer
> is
> > > not
> > > >> able
> > > >>> to push properties properly out of a bulk iteration which in
> some
> > >  cases
> > > >> can
> > > >>> lead to rejected Flink jobs.
> > > >>>
> > > >>> On Fri, Nov 20, 2015 at 11:10 AM, Robert Metzger <
> > >  rmetz...@apache.org>
> > > >>> wrote:
> > > >>>
> > >  Great, thank you!
> > > 
> > >  Let me know if there is any issue, I'll address it asap. The
> PR
> > is
> > > > not
> > >  building anymore because you've pushed an update to the Kafka
> > >  documentation. I can rebase and merge the PR once you give me
> > > green
> > > >> light
> > >  ;)
> > > 
> > >  Till has merged FLINK-3021, so we might be able to have a
> first
> > RC
> > > >> today.
> > > 
> > > 
> > >  On Fri, Nov 20, 2015 at 11:05 AM, Stephan Ewen <

[jira] [Created] (FLINK-3060) Add possibility to integrate custom types into the TypeExtractor

2015-11-23 Thread Timo Walther (JIRA)
Timo Walther created FLINK-3060:
---

 Summary: Add possibility to integrate custom types into the 
TypeExtractor
 Key: FLINK-3060
 URL: https://issues.apache.org/jira/browse/FLINK-3060
 Project: Flink
  Issue Type: New Feature
  Components: Type Serialization System
Reporter: Timo Walther
Priority: Minor


As discussed in [FLINK-3002]. It would be nice if we could make custom type 
integration easier by defining an interface/static method that classes can 
implement to create their own type information. That gives users an easy 
extension point.

Custom integrated types need to be checked in `getForObject`, `getForClass` and 
`validateInput`. If we also want to support custom integrated types with 
generics `createTypeInfoWithTypeHierarchy` needs modifications, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3061) Kafka Consumer is not failing if broker is not available

2015-11-23 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-3061:
-

 Summary: Kafka Consumer is not failing if broker is not available
 Key: FLINK-3061
 URL: https://issues.apache.org/jira/browse/FLINK-3061
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Reporter: Robert Metzger
Assignee: Robert Metzger
 Fix For: 1.0.0


It seems that the FlinkKafkaConsumer is just logging the errors when trying to 
get the initial list of partitions for the topic, but its not failing.

The following code ALWAYS runs, even if there is no broker or zookeeper running.
{code}
 def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val stream = env
  .addSource(new FlinkKafkaConsumer082[String]("topic", new 
SimpleStringSchema(), properties))
  .print

env.execute("Flink Kafka Example")
  }
{code}

The runtime consumers are designed to idle when they have no partitions 
assigned, but there is no check that there are no partitions at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3062) Kafka Producer is not failing if broker is not available/no partitions available

2015-11-23 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-3062:
-

 Summary: Kafka Producer is not failing if broker is not 
available/no partitions available
 Key: FLINK-3062
 URL: https://issues.apache.org/jira/browse/FLINK-3062
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Reporter: Robert Metzger
Assignee: Robert Metzger
 Fix For: 1.0.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3064) Missing size check in GroupReduceOperatorBase leads to NPE

2015-11-23 Thread Martin Junghanns (JIRA)
Martin Junghanns created FLINK-3064:
---

 Summary: Missing size check in GroupReduceOperatorBase leads to NPE
 Key: FLINK-3064
 URL: https://issues.apache.org/jira/browse/FLINK-3064
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.0.0
Reporter: Martin Junghanns
Assignee: Martin Junghanns
Priority: Minor


The following example leads to a NPE:
{code:java}
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

env.fromCollection(Lists.newArrayList(new Tuple1<>(1L)))
  .filter(new AlwaysFalseFilter()) // returns false for any element
  .sum(0)
  .print();
{code}

In {{GroupReduceOperatorBase}}, it is not always checked if the input to a 
{{GroupReduceFunction}} is not empty. This leads to the NPE when executing the 
{{AggregateOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Apache Tinkerpop & Geode Integration?

2015-11-23 Thread Vasiliki Kalavri
Hi James,

thank you for your e-mail and your interest in Flink :)

I've recently taken a _quick_ look into Apache TinkerPop and I think it'd
be very interesting to integrate with Flink/Gelly.
Are you thinking about something like a Flink GraphComputer, similar to
Giraph and Spark GraphComputer's?
I believe such an integration should be straight-forward to implement. You
can start by looking into Flink iteration operators [1] and Gelly iteration
abstractions [2].

Regarding Apache Geode, I'm not familiar with project, but I'll try to take
a look in the following days!

Cheers,
-Vasia.


[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#iteration-operators
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/libs/gelly_guide.html#iterative-graph-processing


On 20 November 2015 at 08:32, James Thornton 
wrote:

> Hi -
>
> This is James Thornton (espeed) from the Apache Tinkerpop project (
> http://tinkerpop.incubator.apache.org/).
>
> The Flink iterators should pair well with Gremlin's Graph Traversal Machine
> (
>
> http://www.datastax.com/dev/blog/the-benefits-of-the-gremlin-graph-traversal-machine
> )
> -- it would be good to coordinate on creating an integration.
>
> Also, Apache Geode made a splash today on HN (
> https://news.ycombinator.com/item?id=10596859) -- connecting Geode and
> Flink would be killer. Here's the Geode/Spark connector for refefference:
>
>
>
> https://github.com/apache/incubator-geode/tree/develop/gemfire-spark-connector
>
> - James
>


Re: Tagging Flink classes with InterfaceAudience and InterfaceStability

2015-11-23 Thread Nick Dimiduk
>
> Do you know if Hadoop/HBase is also using a maven plugin to fail a build on
> breaking API changes? I would really like to have such a functionality in
> Flink, because we can spot breaking changes very early.


I don't think we have maven integration for this as of yet. We release
managers run a script $HBASE/dev-support/check_compatibility.sh that
generates a source and binary compatibility report. Issues are then
resolved during the period leading up to the release candidate.

I think Hadoop is relying on a "QA bot" which reads patches from JIRA and
> then does these
> checks?
>

The "QA bot" is just a collection of shell scripts used during "Patch
Available" status when a patch has been attached to JIRA or when a PR has
been submitted through github. The check_compatibility script could be
included in that automation, I don't see why not. Maybe you'd like to open
a YETUS ticket :)

I've pushed a branch to my own GitHub account with all classes I would make
> public annotated:
>
> https://github.com/apache/flink/compare/master...rmetzger:interface_stability_revapi?expand=1
> Since this is really hard to read, I (half-automated) generated the
> following list of annotated classes:
>
> https://github.com/rmetzger/flink/blob/interface_stability_revapi/annotations.md
>
> Please let me know if you would like to include or exclude classes from
> that list.
> Also, let me know which methods (in stable classes) you would mark as
> experimental.
>
>
>
>
> On Wed, Nov 11, 2015 at 10:17 AM, Aljoscha Krettek 
> wrote:
>
> > +1 for some way of declaring public interfaces as experimental.
> >
> > > On 10 Nov 2015, at 22:24, Stephan Ewen  wrote:
> > >
> > > I think we need anyways an annotation "@PublicExperimental".
> > >
> > > We can make this annotation such that it can be added to methods and
> can
> > > use that to declare
> > > Methods in an otherwise public class (such as DataSet) as experimental.
> > >
> > > On Tue, Nov 10, 2015 at 10:19 PM, Fabian Hueske 
> > wrote:
> > >
> > >> I am not sure if we always should declare complete classes as
> > >> @PublicInterface.
> > >> This does definitely make sense for interfaces and abstract classes
> > such as
> > >> MapFunction or InputFormat but not necessarily for classes such as
> > DataSet
> > >> that we might want to extend by methods which should not immediately
> be
> > >> considered as stable.
> > >>
> > >>
> > >> 2015-11-10 21:36 GMT+01:00 Vasiliki Kalavri <
> vasilikikala...@gmail.com
> > >:
> > >>
> > >>> Yes, my opinion is that we shouldn't declare the Gelly API frozen
> yet.
> > >>> We can reconsider when we're closer to the 1.0 release, but if
> > possible,
> > >> I
> > >>> would give it some more time.
> > >>>
> > >>> -V.
> > >>>
> > >>> On 10 November 2015 at 21:06, Stephan Ewen  wrote:
> > >>>
> >  I think no component should be forced to be stable. It should be an
> >  individual decision for each component, and in some cases even for
> >  individual classes.
> > 
> >  @Vasia If you think Gelly should not be declared interface-frozen,
> > then
> >  this is a good point to raise and this should definitely be
> reflected.
> >  There is no point in declaring certain APIs as frozen when we are
> not
> > >> yet
> >  confident they have converged.
> > 
> > 
> > 
> >  On Tue, Nov 10, 2015 at 8:39 PM, Vasiliki Kalavri <
> >  vasilikikala...@gmail.com
> > > wrote:
> > 
> > > Hi Robert,
> > >
> > > thanks for bringing this up!
> > >
> > > I generally like the idea, but I wouldn't rush to annotate the
> Gelly
> > > classes yet. Gelly hasn't had that many users and I'm quite sure
> > >> we'll
> >  find
> > > things to improve as it gets more exposure.
> > > TBH, I think it's quite unfair to force Gelly (also e.g. ML, Table)
> > >> to
> > >>> a
> > > "1.0" status (from an API stability point of view) since they're
> > >> really
> > > young compared to the other Flink APIs.
> > >
> > > Cheers,
> > > Vasia.
> > > On Nov 10, 2015 8:04 PM, "Robert Metzger" 
> > >> wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I would like to bring this discussion back to your attention as we
> > >>> seem
> > > to
> > >> approach the 1.0 release of Flink.
> > >>
> > >> My suggestion back in January was to annotate all classes, but I
> > >>> think
> > >> it'll be more feasible to just annotate public classes.
> > >> So how about adding an annotation @PublicInterface
> > >>
> > >> For @PublicInterface, I would annotate classes such as: DataSet,
> > >> DataStream, ExecutionEnvironment, InputFormat, MapFunction,
> > >>> FileSystems
> > > but
> > >> also Gelly for example.
> > >>
> > >> I would not annotate as public components such as ML, Storm
> > > compatibility,
> > >> internals from runtime, 

[jira] [Created] (FLINK-3063) [py] Remove combiner

2015-11-23 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-3063:
---

 Summary: [py] Remove combiner
 Key: FLINK-3063
 URL: https://issues.apache.org/jira/browse/FLINK-3063
 Project: Flink
  Issue Type: Improvement
  Components: Python API
Affects Versions: 0.10.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.0.0


The current combiner implementation in the PythonAPI is quite a mess. It adds a 
lot of unreadable clutter, is inefficient at times, and can straight up break 
in some edge cases.

I will revisit this feature after FLINK-2501 is resolved. Several changes for 
that issue will make the reimplementation easier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Either left() vs left(value)

2015-11-23 Thread Gyula Fóra
I think it is not too bad to only have the Right/Left classes. You can then
write it like this:

Either e1 = new Left<>("");
Either e2 = new Right<>(1);

(this would be pretty much like in scala)

or we can add static methods like: Left.of(...), Right.of(...) which would
work exactly as it does now.

And then we can live without the static methods in Either (Either would
become Abstract).

Gyula

Vasiliki Kalavri  ezt írta (időpont: 2015. nov.
23., H, 21:25):

> Ah I see. Well, as I also said in the PR, Left and Right make no sense on
> their own, they're helper classes for Either. Hence, I believe they should
> be private. Maybe we could rename the methods to createLeft() /
> createRight() ?
>
> On 23 November 2015 at 20:58, Gyula Fóra  wrote:
>
> > I was actually not suggesting to drop the e.left() method but instead the
> > Either.left(val).
> > Renaming the left(), right() methods might be confusing as than it would
> be
> > inconsistent with the scala version.
> >
> > On the other hand we could change the way the user can create the Left
> > Right classes, maybe directly expose them instead of the static method.
> (or
> > rename the static method)
> >
> > Gyula
> >
> > Vasiliki Kalavri  ezt írta (időpont: 2015.
> nov.
> > 23., H, 20:14):
> >
> > > Hey Gyula,
> > >
> > > I don't think dropping the method is a good idea. We need a way to
> > retrieve
> > > left and right values, no?
> > > How about renaming to getLeft() / getRight()?
> > >
> > > -V.
> > >
> > > On 23 November 2015 at 09:55, Gyula Fóra  wrote:
> > >
> > > > Hey guys,
> > > >
> > > > I know this should have been part of the PR discussion but it kind of
> > > > slipped through the cracks :)
> > > >
> > > > I think it might be useful to change the method name for
> > > Either.left(value)
> > > > to Either.Left(value) (or drop the method completely).
> > > >
> > > > The reason is that it is slightly awkward to use it with java 8
> > lambdas.
> > > > You cannot use Either::left because of the name clash. Maybe it's
> not a
> > > > huge issue but a small inconvenience that will come up more often as
> we
> > > are
> > > > gradually moving to java 8 anyways :)
> > > >
> > > > What do you think?
> > > > Gyula
> > > >
> > >
> >
>


Re: Either left() vs left(value)

2015-11-23 Thread Vasiliki Kalavri
Either is abstract already ;)

On 23 November 2015 at 21:54, Gyula Fóra  wrote:

> I think it is not too bad to only have the Right/Left classes. You can then
> write it like this:
>
> Either e1 = new Left<>("");
> Either e2 = new Right<>(1);
>
> (this would be pretty much like in scala)
>
> or we can add static methods like: Left.of(...), Right.of(...) which would
> work exactly as it does now.
>
> And then we can live without the static methods in Either (Either would
> become Abstract).
>
> Gyula
>
> Vasiliki Kalavri  ezt írta (időpont: 2015. nov.
> 23., H, 21:25):
>
> > Ah I see. Well, as I also said in the PR, Left and Right make no sense on
> > their own, they're helper classes for Either. Hence, I believe they
> should
> > be private. Maybe we could rename the methods to createLeft() /
> > createRight() ?
> >
> > On 23 November 2015 at 20:58, Gyula Fóra  wrote:
> >
> > > I was actually not suggesting to drop the e.left() method but instead
> the
> > > Either.left(val).
> > > Renaming the left(), right() methods might be confusing as than it
> would
> > be
> > > inconsistent with the scala version.
> > >
> > > On the other hand we could change the way the user can create the Left
> > > Right classes, maybe directly expose them instead of the static method.
> > (or
> > > rename the static method)
> > >
> > > Gyula
> > >
> > > Vasiliki Kalavri  ezt írta (időpont: 2015.
> > nov.
> > > 23., H, 20:14):
> > >
> > > > Hey Gyula,
> > > >
> > > > I don't think dropping the method is a good idea. We need a way to
> > > retrieve
> > > > left and right values, no?
> > > > How about renaming to getLeft() / getRight()?
> > > >
> > > > -V.
> > > >
> > > > On 23 November 2015 at 09:55, Gyula Fóra 
> wrote:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > I know this should have been part of the PR discussion but it kind
> of
> > > > > slipped through the cracks :)
> > > > >
> > > > > I think it might be useful to change the method name for
> > > > Either.left(value)
> > > > > to Either.Left(value) (or drop the method completely).
> > > > >
> > > > > The reason is that it is slightly awkward to use it with java 8
> > > lambdas.
> > > > > You cannot use Either::left because of the name clash. Maybe it's
> > not a
> > > > > huge issue but a small inconvenience that will come up more often
> as
> > we
> > > > are
> > > > > gradually moving to java 8 anyways :)
> > > > >
> > > > > What do you think?
> > > > > Gyula
> > > > >
> > > >
> > >
> >
>