Watermark through Rest Api

2018-10-08 Thread Gregory Fee
Hello! I am interested in getting the current low watermark for tasks
in my Flink jobs. I know how to see them in the UI. I'm interested in
getting them programmatically, hopefully via rest api. The
documentation says that they are exposed as metrics but I don't see
watermark info in the 'metrics' section in the job detail. Does anyone
know how I might get the watermark information?

Thanks,
-- 
Gregory Fee
Engineer


Re: [DISCUSS] Breaking the Scala API for Scala 2.12 Support

2018-10-08 Thread Aljoscha Krettek
Yes, but I think we would pretty much have to do that. I don't think we can 
stop doing 2.11 releases.

> On 8. Oct 2018, at 15:37, Chesnay Schepler  wrote:
> 
> The infrastructure would only be required if we opt for releasing 2.11 and 
> 2.12 builds simultaneously, correct?
> 
> On 08.10.2018 15:04, Aljoscha Krettek wrote:
>> Breaking the API (or not breaking it but requiring explicit types when using 
>> Scala 2.12) and the Maven infrastructure to actually build a 2.12 release.
>> 
>>> On 8. Oct 2018, at 13:00, Chesnay Schepler  wrote:
>>> 
>>> And the remaining parts would only be about breaking the API?
>>> 
>>> On 08.10.2018 12:24, Aljoscha Krettek wrote:
 I have an open PR that does everything we can do for preparing the code 
 base for Scala 2.12 without breaking the API: 
 https://github.com/apache/flink/pull/6784
 
> On 8. Oct 2018, at 09:56, Chesnay Schepler  wrote:
> 
> I'd rather not maintain 2 master branches. Beyond the maintenance 
> overhead I'm
> wondering about the benefit, as the API break still has to happen at some 
> point.
> 
> @Aljoscha how much work for supporting scala 2.12 can be merged without 
> breaking the API?
> If this is the only blocker I suggest to make the breaking change in 1.8.
> 
> On 05.10.2018 10:31, Till Rohrmann wrote:
>> Thanks Aljoscha for starting this discussion. The described problem 
>> brings
>> us indeed a bit into a pickle. Even with option 1) I think it is somewhat
>> API breaking because everyone who used lambdas without types needs to add
>> them now. Consequently, I only see two real options out of the ones 
>> you've
>> proposed:
>> 
>> 1) Disambiguate the API (either by removing
>> reduceGroup(GroupReduceFunction) or by renaming it to reduceGroupJ)
>> 2) Maintain a 2.11 and 2.12 master branch until we phase 2.11 completely 
>> out
>> 
>> Removing the reduceGroup(GroupReduceFunction) in option 1 is a bit
>> problematic because then all Scala API users who have implemented a
>> GroupReduceFunction need to convert it into a Scala lambda. Moreover, I
>> think it will be problematic with RichGroupReduceFunction which you need 
>> to
>> get access to the RuntimeContext.
>> 
>> Maintaining two master branches puts a lot of burden onto the developers 
>> to
>> always keep the two branches in sync. Ideally I would like to avoid this.
>> 
>> I also played a little bit around with implicit conversions to add the
>> lambda methods in Scala 2.11 on demand, but I was not able to get it work
>> smoothly.
>> 
>> I'm cross posting this thread to user as well to get some more user
>> feedback.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 4, 2018 at 7:36 PM Elias Levy 
>> wrote:
>> 
>>> The second alternative, with the addition of methods that take functions
>>> with Scala types, seems the most sensible.  I wonder if there is a need
>>> then to maintain the *J Java parameter methods, or whether users could 
>>> just
>>> access the functionality by converting the Scala DataStreams to Java via
>>> .javaStream and whatever the equivalent is for DataSets.
>>> 
>>> On Thu, Oct 4, 2018 at 8:10 AM Aljoscha Krettek 
>>> wrote:
>>> 
 Hi,
 
 I'm currently working on
>>> https://issues.apache.org/jira/browse/FLINK-7811,
 with the goal of adding support for Scala 2.12. There is a bit of a
>>> hurdle
 and I have to explain some context first.
 
 With Scala 2.12, lambdas are implemented using the lambda mechanism of
 Java 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This
 means that the following two method definitions can both take a lambda:
 
 def map[R](mapper: MapFunction[T, R]): DataSet[R]
 def map[R](fun: T => R): DataSet[R]
 
 The Scala compiler gives precedence to the lambda version when you call
 map() with a lambda in simple cases, so it works here. You could still
>>> call
 map() with a lambda if the lambda version of the method weren't here
 because they are now considered the same. For Scala 2.11 we need both
 signatures, though, to allow calling with a lambda and with a
>>> MapFunction.
 The problem is with more complicated method signatures, like:
 
 def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
 DataSet[R]
 
 def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]
 
 (for reference, GroupReduceFunction is a SAM with void
 reduce(java.lang.Iterable values, Collector out))
 
 These two signatures are not the same but similar enough for the Scala
 2.12 compiler to "get confused". In Scala 2.11, I could call
>>> 

Re: Duplicates in self join

2018-10-08 Thread Fabian Hueske
Did you check the new interval join that was added with Flink 1.6.0 [1]?
It might be better suited because, each record has its own boundaries based
on its timestamp and the join window interval.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join

Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman <
eric.good...@colorado.edu>:

> If I change it to a Tumbling window some of the results will be lost since
> the pattern I'm matching has a temporal extent, so if the pattern starts in
> one tumbling window and ends in the next, it won't be reported.  Based on
> the temporal length of the query, you can set the sliding window and the
> window lengths to capture all the patterns, though as you note, you will
> get duplicates.
>
> On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng  wrote:
>
>> Hi Eric,
>>
>> Can you change Sliding window to Tumbling window? The data of different
>> sliding window are likely overlap.
>>
>> Best, Hequn
>>
>> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński  wrote:
>>
>>> Hey,
>>> IMHO, the simplest way in your case would be to use the Evictor to evict
>>> duplicate values after the window is generated. Have look at it here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>>>
>>> Best Regards,
>>> Dominik.
>>>
>>> pon., 8 paź 2018 o 08:00 Eric L Goodman 
>>> napisał(a):
>>>
 What is the best way to avoid or remove duplicates when joining a
 stream with itself?  I'm performing a streaming temporal triangle
 computation and the first part is to find triads of two edges of the form
 vertexA->vertexB and vertexB->vertexC (and there are temporal constraints
 where the first edge occurs before the second edge).  To do that, I have
 the following code:

 DataStream triads = edges.join(edges)
 .where(new DestKeySelector())
 .equalTo(new SourceKeySelector())
 .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
 Time.milliseconds(slideSizeMs)))
 .apply(new EdgeJoiner(queryWindow));

 However, when I look at the triads being built, there are two copies of 
 each triad.

 For example, if I create ten edges (time, source, target):

 0.0, 4, 0

 0.01, 1, 5

 0.02, 3, 7

 0.03, 0, 8

 0.04, 0, 9

 0.05, 4, 8

 0.06, 4, 3

 0.07, 5, 9

 0.08, 7, 1

 0.09, 9, 6


 It creates the following triads (time1, source1, target1, time2,
 source2, targe2). Note there are two copies of each.

 0.0, 4, 0 0.03, 0, 8

 0.0, 4, 0 0.03, 0, 8

 0.0, 4, 0 0.04, 0, 9

 0.0, 4, 0 0.04, 0, 9

 0.01, 1, 5 0.07, 5, 9

 0.01, 1, 5 0.07, 5, 9

 0.02, 3, 7 0.08, 7, 1

 0.02, 3, 7 0.08, 7, 1

 0.04, 0, 9 0.09, 9, 6

 0.04, 0, 9 0.09, 9, 6

 0.07, 5, 9 0.09, 9, 6

 0.07, 5, 9 0.09, 9, 6

 I'm assuming this behavior has something to do with the joining of "edges" 
 with itself.

 I can provide more code if that would be helpful, but I believe I've 
 captured the most salient portion.








Re: Duplicates in self join

2018-10-08 Thread Eric L Goodman
If I change it to a Tumbling window some of the results will be lost since
the pattern I'm matching has a temporal extent, so if the pattern starts in
one tumbling window and ends in the next, it won't be reported.  Based on
the temporal length of the query, you can set the sliding window and the
window lengths to capture all the patterns, though as you note, you will
get duplicates.

On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng  wrote:

> Hi Eric,
>
> Can you change Sliding window to Tumbling window? The data of different
> sliding window are likely overlap.
>
> Best, Hequn
>
> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński  wrote:
>
>> Hey,
>> IMHO, the simplest way in your case would be to use the Evictor to evict
>> duplicate values after the window is generated. Have look at it here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>>
>> Best Regards,
>> Dominik.
>>
>> pon., 8 paź 2018 o 08:00 Eric L Goodman 
>> napisał(a):
>>
>>> What is the best way to avoid or remove duplicates when joining a stream
>>> with itself?  I'm performing a streaming temporal triangle computation and
>>> the first part is to find triads of two edges of the form vertexA->vertexB
>>> and vertexB->vertexC (and there are temporal constraints where the first
>>> edge occurs before the second edge).  To do that, I have the following code:
>>>
>>> DataStream triads = edges.join(edges)
>>> .where(new DestKeySelector())
>>> .equalTo(new SourceKeySelector())
>>> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
>>> Time.milliseconds(slideSizeMs)))
>>> .apply(new EdgeJoiner(queryWindow));
>>>
>>> However, when I look at the triads being built, there are two copies of 
>>> each triad.
>>>
>>> For example, if I create ten edges (time, source, target):
>>>
>>> 0.0, 4, 0
>>>
>>> 0.01, 1, 5
>>>
>>> 0.02, 3, 7
>>>
>>> 0.03, 0, 8
>>>
>>> 0.04, 0, 9
>>>
>>> 0.05, 4, 8
>>>
>>> 0.06, 4, 3
>>>
>>> 0.07, 5, 9
>>>
>>> 0.08, 7, 1
>>>
>>> 0.09, 9, 6
>>>
>>>
>>> It creates the following triads (time1, source1, target1, time2,
>>> source2, targe2). Note there are two copies of each.
>>>
>>> 0.0, 4, 0 0.03, 0, 8
>>>
>>> 0.0, 4, 0 0.03, 0, 8
>>>
>>> 0.0, 4, 0 0.04, 0, 9
>>>
>>> 0.0, 4, 0 0.04, 0, 9
>>>
>>> 0.01, 1, 5 0.07, 5, 9
>>>
>>> 0.01, 1, 5 0.07, 5, 9
>>>
>>> 0.02, 3, 7 0.08, 7, 1
>>>
>>> 0.02, 3, 7 0.08, 7, 1
>>>
>>> 0.04, 0, 9 0.09, 9, 6
>>>
>>> 0.04, 0, 9 0.09, 9, 6
>>>
>>> 0.07, 5, 9 0.09, 9, 6
>>>
>>> 0.07, 5, 9 0.09, 9, 6
>>>
>>> I'm assuming this behavior has something to do with the joining of "edges" 
>>> with itself.
>>>
>>> I can provide more code if that would be helpful, but I believe I've 
>>> captured the most salient portion.
>>>
>>>
>>>
>>>
>>>
>>>


Re: Duplicates in self join

2018-10-08 Thread Hequn Cheng
Hi Eric,

Can you change Sliding window to Tumbling window? The data of different
sliding window are likely overlap.

Best, Hequn

On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński  wrote:

> Hey,
> IMHO, the simplest way in your case would be to use the Evictor to evict
> duplicate values after the window is generated. Have look at it here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>
> Best Regards,
> Dominik.
>
> pon., 8 paź 2018 o 08:00 Eric L Goodman 
> napisał(a):
>
>> What is the best way to avoid or remove duplicates when joining a stream
>> with itself?  I'm performing a streaming temporal triangle computation and
>> the first part is to find triads of two edges of the form vertexA->vertexB
>> and vertexB->vertexC (and there are temporal constraints where the first
>> edge occurs before the second edge).  To do that, I have the following code:
>>
>> DataStream triads = edges.join(edges)
>> .where(new DestKeySelector())
>> .equalTo(new SourceKeySelector())
>> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
>> Time.milliseconds(slideSizeMs)))
>> .apply(new EdgeJoiner(queryWindow));
>>
>> However, when I look at the triads being built, there are two copies of each 
>> triad.
>>
>> For example, if I create ten edges (time, source, target):
>>
>> 0.0, 4, 0
>>
>> 0.01, 1, 5
>>
>> 0.02, 3, 7
>>
>> 0.03, 0, 8
>>
>> 0.04, 0, 9
>>
>> 0.05, 4, 8
>>
>> 0.06, 4, 3
>>
>> 0.07, 5, 9
>>
>> 0.08, 7, 1
>>
>> 0.09, 9, 6
>>
>>
>> It creates the following triads (time1, source1, target1, time2, source2,
>> targe2). Note there are two copies of each.
>>
>> 0.0, 4, 0 0.03, 0, 8
>>
>> 0.0, 4, 0 0.03, 0, 8
>>
>> 0.0, 4, 0 0.04, 0, 9
>>
>> 0.0, 4, 0 0.04, 0, 9
>>
>> 0.01, 1, 5 0.07, 5, 9
>>
>> 0.01, 1, 5 0.07, 5, 9
>>
>> 0.02, 3, 7 0.08, 7, 1
>>
>> 0.02, 3, 7 0.08, 7, 1
>>
>> 0.04, 0, 9 0.09, 9, 6
>>
>> 0.04, 0, 9 0.09, 9, 6
>>
>> 0.07, 5, 9 0.09, 9, 6
>>
>> 0.07, 5, 9 0.09, 9, 6
>>
>> I'm assuming this behavior has something to do with the joining of "edges" 
>> with itself.
>>
>> I can provide more code if that would be helpful, but I believe I've 
>> captured the most salient portion.
>>
>>
>>
>>
>>
>>


Re: [DISCUSS] Breaking the Scala API for Scala 2.12 Support

2018-10-08 Thread Chesnay Schepler
The infrastructure would only be required if we opt for releasing 2.11 
and 2.12 builds simultaneously, correct?


On 08.10.2018 15:04, Aljoscha Krettek wrote:

Breaking the API (or not breaking it but requiring explicit types when using 
Scala 2.12) and the Maven infrastructure to actually build a 2.12 release.


On 8. Oct 2018, at 13:00, Chesnay Schepler  wrote:

And the remaining parts would only be about breaking the API?

On 08.10.2018 12:24, Aljoscha Krettek wrote:

I have an open PR that does everything we can do for preparing the code base 
for Scala 2.12 without breaking the API: 
https://github.com/apache/flink/pull/6784


On 8. Oct 2018, at 09:56, Chesnay Schepler  wrote:

I'd rather not maintain 2 master branches. Beyond the maintenance overhead I'm
wondering about the benefit, as the API break still has to happen at some point.

@Aljoscha how much work for supporting scala 2.12 can be merged without 
breaking the API?
If this is the only blocker I suggest to make the breaking change in 1.8.

On 05.10.2018 10:31, Till Rohrmann wrote:

Thanks Aljoscha for starting this discussion. The described problem brings
us indeed a bit into a pickle. Even with option 1) I think it is somewhat
API breaking because everyone who used lambdas without types needs to add
them now. Consequently, I only see two real options out of the ones you've
proposed:

1) Disambiguate the API (either by removing
reduceGroup(GroupReduceFunction) or by renaming it to reduceGroupJ)
2) Maintain a 2.11 and 2.12 master branch until we phase 2.11 completely out

Removing the reduceGroup(GroupReduceFunction) in option 1 is a bit
problematic because then all Scala API users who have implemented a
GroupReduceFunction need to convert it into a Scala lambda. Moreover, I
think it will be problematic with RichGroupReduceFunction which you need to
get access to the RuntimeContext.

Maintaining two master branches puts a lot of burden onto the developers to
always keep the two branches in sync. Ideally I would like to avoid this.

I also played a little bit around with implicit conversions to add the
lambda methods in Scala 2.11 on demand, but I was not able to get it work
smoothly.

I'm cross posting this thread to user as well to get some more user
feedback.

Cheers,
Till

On Thu, Oct 4, 2018 at 7:36 PM Elias Levy 
wrote:


The second alternative, with the addition of methods that take functions
with Scala types, seems the most sensible.  I wonder if there is a need
then to maintain the *J Java parameter methods, or whether users could just
access the functionality by converting the Scala DataStreams to Java via
.javaStream and whatever the equivalent is for DataSets.

On Thu, Oct 4, 2018 at 8:10 AM Aljoscha Krettek 
wrote:


Hi,

I'm currently working on

https://issues.apache.org/jira/browse/FLINK-7811,

with the goal of adding support for Scala 2.12. There is a bit of a

hurdle

and I have to explain some context first.

With Scala 2.12, lambdas are implemented using the lambda mechanism of
Java 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This
means that the following two method definitions can both take a lambda:

def map[R](mapper: MapFunction[T, R]): DataSet[R]
def map[R](fun: T => R): DataSet[R]

The Scala compiler gives precedence to the lambda version when you call
map() with a lambda in simple cases, so it works here. You could still

call

map() with a lambda if the lambda version of the method weren't here
because they are now considered the same. For Scala 2.11 we need both
signatures, though, to allow calling with a lambda and with a

MapFunction.

The problem is with more complicated method signatures, like:

def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
DataSet[R]

def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]

(for reference, GroupReduceFunction is a SAM with void
reduce(java.lang.Iterable values, Collector out))

These two signatures are not the same but similar enough for the Scala
2.12 compiler to "get confused". In Scala 2.11, I could call

reduceGroup()

with a lambda that doesn't have parameter type definitions and things

would

be fine. With Scala 2.12 I can't do that because the compiler can't

figure

out which method to call and requires explicit type definitions on the
lambda parameters.

I see some solutions for this:

1. Keep the methods as is, this would force people to always explicitly
specify parameter types on their lambdas.

2. Rename the second method to reduceGroupJ() to signal that it takes a
user function that takes Java-style interfaces (the first parameter is
java.lang.Iterable while the Scala lambda takes a scala.Iterator). This
disambiguates the code, users can use lambdas without specifying explicit
parameter types but breaks the API.

One effect of 2. would be that we can add a reduceGroup() method that
takes a api.scala.GroupReduceFunction that takes proper Scala types, thus
it would allow people to implement user functions without 

Re: Using several Kerberos keytabs in standalone cluster

2018-10-08 Thread Aljoscha Krettek
Hi Olga,

I think right now this is not possible because we piggybag on the YARN shipment 
functionality for shipping the Keytab along with the TaskManagers.

I think changing this would require somewhat bigger changes because loading the 
Keytab happens when the TaskManagers are brought up.

Best,
Aljoscha

> On 8. Oct 2018, at 04:52, Olga Luganska  wrote:
> 
> Hello,
> 
> According to the documentation, the security setup is shared by all the jobs 
> on the same cluster, and if users need to use a different keytab, 
> it is easily achievable in Yarn cluster setup by starting a new cluster with 
> a different flink-conf.yaml
> Is it possible to setup a standalone cluster in such a way that each user can 
> run his respective jobs with his own keytab?
> 
> Thank you very much,
> Olga



Re: [DISCUSS] Breaking the Scala API for Scala 2.12 Support

2018-10-08 Thread Aljoscha Krettek
Breaking the API (or not breaking it but requiring explicit types when using 
Scala 2.12) and the Maven infrastructure to actually build a 2.12 release.

> On 8. Oct 2018, at 13:00, Chesnay Schepler  wrote:
> 
> And the remaining parts would only be about breaking the API?
> 
> On 08.10.2018 12:24, Aljoscha Krettek wrote:
>> I have an open PR that does everything we can do for preparing the code base 
>> for Scala 2.12 without breaking the API: 
>> https://github.com/apache/flink/pull/6784
>> 
>>> On 8. Oct 2018, at 09:56, Chesnay Schepler  wrote:
>>> 
>>> I'd rather not maintain 2 master branches. Beyond the maintenance overhead 
>>> I'm
>>> wondering about the benefit, as the API break still has to happen at some 
>>> point.
>>> 
>>> @Aljoscha how much work for supporting scala 2.12 can be merged without 
>>> breaking the API?
>>> If this is the only blocker I suggest to make the breaking change in 1.8.
>>> 
>>> On 05.10.2018 10:31, Till Rohrmann wrote:
 Thanks Aljoscha for starting this discussion. The described problem brings
 us indeed a bit into a pickle. Even with option 1) I think it is somewhat
 API breaking because everyone who used lambdas without types needs to add
 them now. Consequently, I only see two real options out of the ones you've
 proposed:
 
 1) Disambiguate the API (either by removing
 reduceGroup(GroupReduceFunction) or by renaming it to reduceGroupJ)
 2) Maintain a 2.11 and 2.12 master branch until we phase 2.11 completely 
 out
 
 Removing the reduceGroup(GroupReduceFunction) in option 1 is a bit
 problematic because then all Scala API users who have implemented a
 GroupReduceFunction need to convert it into a Scala lambda. Moreover, I
 think it will be problematic with RichGroupReduceFunction which you need to
 get access to the RuntimeContext.
 
 Maintaining two master branches puts a lot of burden onto the developers to
 always keep the two branches in sync. Ideally I would like to avoid this.
 
 I also played a little bit around with implicit conversions to add the
 lambda methods in Scala 2.11 on demand, but I was not able to get it work
 smoothly.
 
 I'm cross posting this thread to user as well to get some more user
 feedback.
 
 Cheers,
 Till
 
 On Thu, Oct 4, 2018 at 7:36 PM Elias Levy 
 wrote:
 
> The second alternative, with the addition of methods that take functions
> with Scala types, seems the most sensible.  I wonder if there is a need
> then to maintain the *J Java parameter methods, or whether users could 
> just
> access the functionality by converting the Scala DataStreams to Java via
> .javaStream and whatever the equivalent is for DataSets.
> 
> On Thu, Oct 4, 2018 at 8:10 AM Aljoscha Krettek 
> wrote:
> 
>> Hi,
>> 
>> I'm currently working on
> https://issues.apache.org/jira/browse/FLINK-7811,
>> with the goal of adding support for Scala 2.12. There is a bit of a
> hurdle
>> and I have to explain some context first.
>> 
>> With Scala 2.12, lambdas are implemented using the lambda mechanism of
>> Java 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This
>> means that the following two method definitions can both take a lambda:
>> 
>> def map[R](mapper: MapFunction[T, R]): DataSet[R]
>> def map[R](fun: T => R): DataSet[R]
>> 
>> The Scala compiler gives precedence to the lambda version when you call
>> map() with a lambda in simple cases, so it works here. You could still
> call
>> map() with a lambda if the lambda version of the method weren't here
>> because they are now considered the same. For Scala 2.11 we need both
>> signatures, though, to allow calling with a lambda and with a
> MapFunction.
>> The problem is with more complicated method signatures, like:
>> 
>> def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
>> DataSet[R]
>> 
>> def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]
>> 
>> (for reference, GroupReduceFunction is a SAM with void
>> reduce(java.lang.Iterable values, Collector out))
>> 
>> These two signatures are not the same but similar enough for the Scala
>> 2.12 compiler to "get confused". In Scala 2.11, I could call
> reduceGroup()
>> with a lambda that doesn't have parameter type definitions and things
> would
>> be fine. With Scala 2.12 I can't do that because the compiler can't
> figure
>> out which method to call and requires explicit type definitions on the
>> lambda parameters.
>> 
>> I see some solutions for this:
>> 
>> 1. Keep the methods as is, this would force people to always explicitly
>> specify parameter types on their lambdas.
>> 
>> 2. Rename the second method to reduceGroupJ() to signal that it takes a
>> 

Re: flink memory management / temp-io dir question

2018-10-08 Thread Till Rohrmann
Hi Anand,

spilling using the io directories is only relevant for Flink's batch
processing. This happens, for example if you enable blocking data exchange
where the produced data cannot be kept in memory. Moreover, it is used by
many of Flink's out-of-core data structures to enable exactly this feature
(e.g. users are the MutableHashTable, the MergeIterator to combine sorted
ata which has been spilled or the SorterMerger to actually spill data).

In streaming Flink uses the RocksDB state backend to spill very large state
gracefully to disk. Thus, you would need to configure RocksDB in order to
control the spilling behaviour.

Cheers,
Till

On Mon, Oct 8, 2018 at 2:18 PM Kostas Kloudas 
wrote:

> Sorry, I forgot to cc’ Till.
>
> On Oct 8, 2018, at 2:17 PM, Kostas Kloudas 
> wrote:
>
> Hi Anand,
>
> I think that Till is the best person to answer your question.
>
> Cheers,
> Kostas
>
> On Oct 5, 2018, at 3:44 PM, anand.gopin...@ubs.com wrote:
>
> Hi ,
> I had a question with respect flink memory management / overspill to /tmp.
>
> In the docs (
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-temporary-io-directories)
> it says: Although Flink aims to process as much data in main memory as
> possible, it is not uncommon that more data needs to be processed than
> memory is available. Flink’s runtime is designed to write temporary data to
> disk to handle these situations
>
> In a  flink job  that processes a couple streams of 1M events in a
> windowed co group function with parallelism 8 - we see 8 dirs created in
> /tmp with 100s of Meg of data, the name of each dir seems aligned to the
> data for each parallel thread windowing against the co-group  operator
>
> e.g.
> bash-4.2$ du -sh *
> 0   flink-dist-cache-a4a69215-665a-4c3c-8d90-416cbe192f26
> 352Mflink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e
> 4.0KlocalState
> 7.2Mrocksdb-lib-03d9460b15e6bf6af4f3d9b0ff7980c3
>
> bash-4.2$ du -sh flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/*
> ...
> 36M
> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__1_8__uuid_93307150-4f62-4b06-a71e-0230360f7d86
> 36M
> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__2_8__uuid_7b2f8957-7044-4bb3-869e-28843bd737a1
> 36M
> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__3_8__uuid_54306a44-7e06-45ae-ba0e-4649887bca7e
> ...
>
> I was wondering can / should this 'over spill' be avoided by increasing
> the heap of the task manager or another config or should I not worry about
> it?
> Is there more information/docs on how this data is used/ cleaned up & what
> is the cost of this overspill to latency/ checkpointing? Any impact I
> should be aware of?
>
> thanks
> Anand
>
> Visit our website at http://www.ubs.com
>
> This message contains confidential information and is intended only
> for the individual named. If you are not the named addressee you
> should not disseminate, distribute or copy this e-mail. Please
> notify the sender immediately by e-mail if you have received this
> e-mail by mistake and delete this e-mail from your system.
>
> E-mails are not encrypted and cannot be guaranteed to be secure or
> error-free as information could be intercepted, corrupted, lost,
> destroyed, arrive late or incomplete, or contain viruses. The sender
> therefore does not accept liability for any errors or omissions in the
> contents of this message which arise as a result of e-mail transmission.
> If verification is required please request a hard-copy version. This
> message is provided for informational purposes and should not be
> construed as a solicitation or offer to buy or sell any securities
> or related financial instruments.
>
> UBS Limited is a company limited by shares incorporated in the United
> Kingdom registered in England and Wales with number 2035362.
> Registered Office: 5 Broadgate, London EC2M 2QS
> UBS Limited is authorised by the Prudential Regulation Authority
> and regulated by the Financial Conduct Authority and the Prudential
> Regulation Authority.
>
> UBS AG is a public company incorporated with limited liability in
> Switzerland domiciled in the Canton of Basel-City and the Canton of
> Zurich respectively registered at the Commercial Registry offices in
> those Cantons with new Identification No: CHE-101.329.561 as from 18
> December 2013 (and prior to 18 December 2013 with Identification
> No: CH-270.3.004.646-4) and having respective head offices at
> Aeschenvorstadt 1, 4051 Basel and Bahnhofstrasse 45, 8001 Zurich,
> Switzerland and is authorised and regulated by the Financial Market
> Supervisory Authority in Switzerland.  Registered in the United
> Kingdom as a foreign company with No: FC021146 and having a UK
> Establishment registered at 

Re: error in using kafka in flink

2018-10-08 Thread marzieh ghasemi
Hello

Thank you very much, but I imported "FlinkKafkaProducer09" and changed
"FlinkKafkaProducer08" to it. Then problem solved.

On Mon, Oct 8, 2018 at 3:39 PM Kostas Kloudas 
wrote:

> Hi Marzieh,
>
> This is because of a mismatch between your Kafka version
> and the one your job assumes (0.8).
>
> You should use an older Kafka version (0.8) for the job to run
> out-of-the-box or update your job to use FlinkKafkaProducer011.
>
> Cheers,
> Kostas
>
> On Oct 6, 2018, at 2:13 PM, marzieh ghasemi 
> wrote:
>
> Hello
>
> I downloaded kafka and followed these instructions step by step:
>
> cd kafka_2.11-2
>
> # start zookeeper server
> ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
>
> # start broker
> ./bin/kafka-server-start.sh ./config/server.properties
>
> # create topic “test”
>  ./bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181
> --partitions 1 --replication-f
>
> # consume from the topic using the console producer
> ./bin/kafka-console-consumer.sh --topic test --zookeeper localhost:2181
>
> # produce something into the topic (write something and hit enter)
> ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
>
> Also, I added "flink-connector-kafka" and "kafka-client" dependencies to
> "pom.xml".
> But while I run the example of "Monitoring the Wikipedia Edit Stream" I
> got this error:
> "cannot resolve symbol FlinkKafkaProducer08". I searched a lot but I could
> n't find the solution.
>
> Would you please help me?
>
> Thank you in advance.
>
>
>


Re: flink memory management / temp-io dir question

2018-10-08 Thread Kostas Kloudas
Sorry, I forgot to cc’ Till.

> On Oct 8, 2018, at 2:17 PM, Kostas Kloudas  
> wrote:
> 
> Hi Anand,
> 
> I think that Till is the best person to answer your question.
> 
> Cheers,
> Kostas
> 
>> On Oct 5, 2018, at 3:44 PM, anand.gopin...@ubs.com 
>>  wrote:
>> 
>> Hi , 
>> I had a question with respect flink memory management / overspill to /tmp.
>>  
>> In the docs 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-temporary-io-directories
>>  
>> )
>>  it says: Although Flink aims to process as much data in main memory as 
>> possible, it is not uncommon that more data needs to be processed than 
>> memory is available. Flink’s runtime is designed to write temporary data to 
>> disk to handle these situations
>>  
>> In a  flink job  that processes a couple streams of 1M events in a  windowed 
>> co group function with parallelism 8 - we see 8 dirs created in /tmp with 
>> 100s of Meg of data, the name of each dir seems aligned to the data for each 
>> parallel thread windowing against the co-group  operator 
>>  
>> e.g.
>> bash-4.2$ du -sh *
>> 0   flink-dist-cache-a4a69215-665a-4c3c-8d90-416cbe192f26
>> 352Mflink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e
>> 4.0KlocalState
>> 7.2Mrocksdb-lib-03d9460b15e6bf6af4f3d9b0ff7980c3
>>  
>> bash-4.2$ du -sh flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/*
>> ...
>> 36M 
>> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__1_8__uuid_93307150-4f62-4b06-a71e-0230360f7d86
>> 36M 
>> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__2_8__uuid_7b2f8957-7044-4bb3-869e-28843bd737a1
>> 36M 
>> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__3_8__uuid_54306a44-7e06-45ae-ba0e-4649887bca7e
>> ...
>>  
>> I was wondering can / should this 'over spill' be avoided by increasing the 
>> heap of the task manager or another config or should I not worry about it?
>> Is there more information/docs on how this data is used/ cleaned up & what 
>> is the cost of this overspill to latency/ checkpointing? Any impact I should 
>> be aware of?
>>  
>> thanks 
>> Anand
>> 
>> Visit our website at http://www.ubs.com  
>> 
>> This message contains confidential information and is intended only 
>> for the individual named. If you are not the named addressee you 
>> should not disseminate, distribute or copy this e-mail. Please 
>> notify the sender immediately by e-mail if you have received this 
>> e-mail by mistake and delete this e-mail from your system. 
>> 
>> E-mails are not encrypted and cannot be guaranteed to be secure or 
>> error-free as information could be intercepted, corrupted, lost, 
>> destroyed, arrive late or incomplete, or contain viruses. The sender 
>> therefore does not accept liability for any errors or omissions in the 
>> contents of this message which arise as a result of e-mail transmission. 
>> If verification is required please request a hard-copy version. This 
>> message is provided for informational purposes and should not be 
>> construed as a solicitation or offer to buy or sell any securities 
>> or related financial instruments. 
>> 
>> UBS Limited is a company limited by shares incorporated in the United 
>> Kingdom registered in England and Wales with number 2035362.  
>> Registered Office: 5 Broadgate, London EC2M 2QS
>> UBS Limited is authorised by the Prudential Regulation Authority 
>> and regulated by the Financial Conduct Authority and the Prudential 
>> Regulation Authority.
>> 
>> UBS AG is a public company incorporated with limited liability in
>> Switzerland domiciled in the Canton of Basel-City and the Canton of
>> Zurich respectively registered at the Commercial Registry offices in
>> those Cantons with new Identification No: CHE-101.329.561 as from 18
>> December 2013 (and prior to 18 December 2013 with Identification
>> No: CH-270.3.004.646-4) and having respective head offices at
>> Aeschenvorstadt 1, 4051 Basel and Bahnhofstrasse 45, 8001 Zurich,
>> Switzerland and is authorised and regulated by the Financial Market
>> Supervisory Authority in Switzerland.  Registered in the United
>> Kingdom as a foreign company with No: FC021146 and having a UK
>> Establishment registered at Companies House, Cardiff, with
>> No: BR 004507.  The principal office of UK Establishment: 
>> 5 Broadgate, London EC2M 2QS. In the United Kingdom, UBS AG is 
>> authorised by the Prudential Regulation Authority and subject to 
>> regulation by the Financial Conduct Authority and limited regulation 
>> by the Prudential Regulation Authority.  Details about the extent 
>> of our 

Re: flink memory management / temp-io dir question

2018-10-08 Thread Kostas Kloudas
Hi Anand,

I think that Till is the best person to answer your question.

Cheers,
Kostas

> On Oct 5, 2018, at 3:44 PM, anand.gopin...@ubs.com wrote:
> 
> Hi , 
> I had a question with respect flink memory management / overspill to /tmp.
>  
> In the docs 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-temporary-io-directories
>  
> )
>  it says: Although Flink aims to process as much data in main memory as 
> possible, it is not uncommon that more data needs to be processed than memory 
> is available. Flink’s runtime is designed to write temporary data to disk to 
> handle these situations
>  
> In a  flink job  that processes a couple streams of 1M events in a  windowed 
> co group function with parallelism 8 - we see 8 dirs created in /tmp with 
> 100s of Meg of data, the name of each dir seems aligned to the data for each 
> parallel thread windowing against the co-group  operator 
>  
> e.g.
> bash-4.2$ du -sh *
> 0   flink-dist-cache-a4a69215-665a-4c3c-8d90-416cbe192f26
> 352Mflink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e
> 4.0KlocalState
> 7.2Mrocksdb-lib-03d9460b15e6bf6af4f3d9b0ff7980c3
>  
> bash-4.2$ du -sh flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/*
> ...
> 36M 
> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__1_8__uuid_93307150-4f62-4b06-a71e-0230360f7d86
> 36M 
> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__2_8__uuid_7b2f8957-7044-4bb3-869e-28843bd737a1
> 36M 
> flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__3_8__uuid_54306a44-7e06-45ae-ba0e-4649887bca7e
> ...
>  
> I was wondering can / should this 'over spill' be avoided by increasing the 
> heap of the task manager or another config or should I not worry about it?
> Is there more information/docs on how this data is used/ cleaned up & what is 
> the cost of this overspill to latency/ checkpointing? Any impact I should be 
> aware of?
>  
> thanks 
> Anand
> 
> Visit our website at http://www.ubs.com  
> 
> This message contains confidential information and is intended only 
> for the individual named. If you are not the named addressee you 
> should not disseminate, distribute or copy this e-mail. Please 
> notify the sender immediately by e-mail if you have received this 
> e-mail by mistake and delete this e-mail from your system. 
> 
> E-mails are not encrypted and cannot be guaranteed to be secure or 
> error-free as information could be intercepted, corrupted, lost, 
> destroyed, arrive late or incomplete, or contain viruses. The sender 
> therefore does not accept liability for any errors or omissions in the 
> contents of this message which arise as a result of e-mail transmission. 
> If verification is required please request a hard-copy version. This 
> message is provided for informational purposes and should not be 
> construed as a solicitation or offer to buy or sell any securities 
> or related financial instruments. 
> 
> UBS Limited is a company limited by shares incorporated in the United 
> Kingdom registered in England and Wales with number 2035362.  
> Registered Office: 5 Broadgate, London EC2M 2QS
> UBS Limited is authorised by the Prudential Regulation Authority 
> and regulated by the Financial Conduct Authority and the Prudential 
> Regulation Authority.
> 
> UBS AG is a public company incorporated with limited liability in
> Switzerland domiciled in the Canton of Basel-City and the Canton of
> Zurich respectively registered at the Commercial Registry offices in
> those Cantons with new Identification No: CHE-101.329.561 as from 18
> December 2013 (and prior to 18 December 2013 with Identification
> No: CH-270.3.004.646-4) and having respective head offices at
> Aeschenvorstadt 1, 4051 Basel and Bahnhofstrasse 45, 8001 Zurich,
> Switzerland and is authorised and regulated by the Financial Market
> Supervisory Authority in Switzerland.  Registered in the United
> Kingdom as a foreign company with No: FC021146 and having a UK
> Establishment registered at Companies House, Cardiff, with
> No: BR 004507.  The principal office of UK Establishment: 
> 5 Broadgate, London EC2M 2QS. In the United Kingdom, UBS AG is 
> authorised by the Prudential Regulation Authority and subject to 
> regulation by the Financial Conduct Authority and limited regulation 
> by the Prudential Regulation Authority.  Details about the extent 
> of our regulation by the Prudential Regulation Authority are 
> available from us on request.
> 
> UBS Business Solutions AG is a public company incorporated with 
> limited liability in Switzerland domiciled in the Canton of Zurich 

Re: error in using kafka in flink

2018-10-08 Thread Kostas Kloudas
Hi Marzieh,

This is because of a mismatch between your Kafka version 
and the one your job assumes (0.8). 

You should use an older Kafka version (0.8) for the job to run 
out-of-the-box or update your job to use FlinkKafkaProducer011.
 
Cheers,
Kostas

> On Oct 6, 2018, at 2:13 PM, marzieh ghasemi  
> wrote:
> 
> Hello
> 
> I downloaded kafka and followed these instructions step by step:
> 
> cd kafka_2.11-2
> 
> # start zookeeper server
> ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
> 
> # start broker
> ./bin/kafka-server-start.sh ./config/server.properties 
> 
> # create topic “test”
>  ./bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 
> --partitions 1 --replication-f
> 
> # consume from the topic using the console producer
> ./bin/kafka-console-consumer.sh --topic test --zookeeper localhost:2181
> 
> # produce something into the topic (write something and hit enter)
> ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
> 
> Also, I added "flink-connector-kafka" and "kafka-client" dependencies to 
> "pom.xml". 
> But while I run the example of "Monitoring the Wikipedia Edit Stream" I got 
> this error:
> "cannot resolve symbol FlinkKafkaProducer08". I searched a lot but I could 
> n't find the solution. 
> 
> Would you please help me?
> 
> Thank you in advance.



Re: [DISCUSS] Breaking the Scala API for Scala 2.12 Support

2018-10-08 Thread Chesnay Schepler

And the remaining parts would only be about breaking the API?

On 08.10.2018 12:24, Aljoscha Krettek wrote:

I have an open PR that does everything we can do for preparing the code base 
for Scala 2.12 without breaking the API: 
https://github.com/apache/flink/pull/6784


On 8. Oct 2018, at 09:56, Chesnay Schepler  wrote:

I'd rather not maintain 2 master branches. Beyond the maintenance overhead I'm
wondering about the benefit, as the API break still has to happen at some point.

@Aljoscha how much work for supporting scala 2.12 can be merged without 
breaking the API?
If this is the only blocker I suggest to make the breaking change in 1.8.

On 05.10.2018 10:31, Till Rohrmann wrote:

Thanks Aljoscha for starting this discussion. The described problem brings
us indeed a bit into a pickle. Even with option 1) I think it is somewhat
API breaking because everyone who used lambdas without types needs to add
them now. Consequently, I only see two real options out of the ones you've
proposed:

1) Disambiguate the API (either by removing
reduceGroup(GroupReduceFunction) or by renaming it to reduceGroupJ)
2) Maintain a 2.11 and 2.12 master branch until we phase 2.11 completely out

Removing the reduceGroup(GroupReduceFunction) in option 1 is a bit
problematic because then all Scala API users who have implemented a
GroupReduceFunction need to convert it into a Scala lambda. Moreover, I
think it will be problematic with RichGroupReduceFunction which you need to
get access to the RuntimeContext.

Maintaining two master branches puts a lot of burden onto the developers to
always keep the two branches in sync. Ideally I would like to avoid this.

I also played a little bit around with implicit conversions to add the
lambda methods in Scala 2.11 on demand, but I was not able to get it work
smoothly.

I'm cross posting this thread to user as well to get some more user
feedback.

Cheers,
Till

On Thu, Oct 4, 2018 at 7:36 PM Elias Levy 
wrote:


The second alternative, with the addition of methods that take functions
with Scala types, seems the most sensible.  I wonder if there is a need
then to maintain the *J Java parameter methods, or whether users could just
access the functionality by converting the Scala DataStreams to Java via
.javaStream and whatever the equivalent is for DataSets.

On Thu, Oct 4, 2018 at 8:10 AM Aljoscha Krettek 
wrote:


Hi,

I'm currently working on

https://issues.apache.org/jira/browse/FLINK-7811,

with the goal of adding support for Scala 2.12. There is a bit of a

hurdle

and I have to explain some context first.

With Scala 2.12, lambdas are implemented using the lambda mechanism of
Java 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This
means that the following two method definitions can both take a lambda:

def map[R](mapper: MapFunction[T, R]): DataSet[R]
def map[R](fun: T => R): DataSet[R]

The Scala compiler gives precedence to the lambda version when you call
map() with a lambda in simple cases, so it works here. You could still

call

map() with a lambda if the lambda version of the method weren't here
because they are now considered the same. For Scala 2.11 we need both
signatures, though, to allow calling with a lambda and with a

MapFunction.

The problem is with more complicated method signatures, like:

def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
DataSet[R]

def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]

(for reference, GroupReduceFunction is a SAM with void
reduce(java.lang.Iterable values, Collector out))

These two signatures are not the same but similar enough for the Scala
2.12 compiler to "get confused". In Scala 2.11, I could call

reduceGroup()

with a lambda that doesn't have parameter type definitions and things

would

be fine. With Scala 2.12 I can't do that because the compiler can't

figure

out which method to call and requires explicit type definitions on the
lambda parameters.

I see some solutions for this:

1. Keep the methods as is, this would force people to always explicitly
specify parameter types on their lambdas.

2. Rename the second method to reduceGroupJ() to signal that it takes a
user function that takes Java-style interfaces (the first parameter is
java.lang.Iterable while the Scala lambda takes a scala.Iterator). This
disambiguates the code, users can use lambdas without specifying explicit
parameter types but breaks the API.

One effect of 2. would be that we can add a reduceGroup() method that
takes a api.scala.GroupReduceFunction that takes proper Scala types, thus
it would allow people to implement user functions without having to cast
the various Iterator/Iterable parameters.

Either way, people would have to adapt their code when moving to Scala
2.12 in some way, depending on what style of methods they use.

There is also solution 2.5:

2.5 Rename the methods only in the Scala 2.12 build of Flink and keep the
old method names for Scala 2.11. This would require some infrastructure

Job manager logs for previous YARN attempts

2018-10-08 Thread Pawel Bartoszek
Hi,

I am looking into the cause YARN starts new application attempt on Flink
1.5.2. The challenge is getting the logs for the first attempt. After
checking YARN I discovered that in the first attempt and the second one
application manager (job manager) gets assigned the same container id (is
this expected ?)  In this case logs from the first attempt are overwritten?
I found that *setKeepContainersAcrossApplicationAttempts* is enabled here
here


The second challenge is understanding if the job will be restored into new
application attempts or new application attempt will just have flink
running without any job?


Regards,
Pawel

*First attempt:*

pawel_bartoszek@ip-10-4-X-X ~]$ yarn container -list
appattempt_1538570922803_0020_01
18/10/08 10:16:16 INFO client.RMProxy: Connecting to ResourceManager at
ip-10-4-X-X.eu-west-1.compute.internal/10.4.108.26:8032
Total number of containers :1
  Container-Id   Start Time  Finish Time
 State HostNode Http Address
  LOG-URL
container_1538570922803_0020_02_01 Mon Oct 08 09:47:17 + 2018
   N/A  RUNNING
ip-10-4-X-X.eu-west-1.compute.internal:8041
http://ip-10-4-X-X.eu-west-1.compute.internal:8042
http://ip-10-4-X-X.eu-west-1.compute.internal:8042/node/containerlogs/container_1538570922803_0020_02_01/pawel_bartoszek

*Second attempt:*
[pawel_bartoszek@ip-10-4-X-X ~]$ yarn container -list
appattempt_1538570922803_0020_02
18/10/08 10:16:37 INFO client.RMProxy: Connecting to ResourceManager at
ip-10-4-X-X.eu-west-1.compute.internal/10.4.X.X:8032
Total number of containers :1
  Container-Id   Start Time  Finish Time
 State HostNode Http Address
  LOG-URL
container_1538570922803_0020_02_01 Mon Oct 08 09:47:17 + 2018
   N/A  RUNNING
ip-10-4-X-X.eu-west-1.compute.internal:8041
http://ip-10-4-X-X.eu-west-1.compute.internal:8042
http://ip-10-4-X-X.eu-west-1.compute.internal:8042/node/containerlogs/container_1538570922803_0020_02_01/pawel_bartoszek


Re: [DISCUSS] Breaking the Scala API for Scala 2.12 Support

2018-10-08 Thread Aljoscha Krettek
I have an open PR that does everything we can do for preparing the code base 
for Scala 2.12 without breaking the API: 
https://github.com/apache/flink/pull/6784

> On 8. Oct 2018, at 09:56, Chesnay Schepler  wrote:
> 
> I'd rather not maintain 2 master branches. Beyond the maintenance overhead I'm
> wondering about the benefit, as the API break still has to happen at some 
> point.
> 
> @Aljoscha how much work for supporting scala 2.12 can be merged without 
> breaking the API?
> If this is the only blocker I suggest to make the breaking change in 1.8.
> 
> On 05.10.2018 10:31, Till Rohrmann wrote:
>> Thanks Aljoscha for starting this discussion. The described problem brings
>> us indeed a bit into a pickle. Even with option 1) I think it is somewhat
>> API breaking because everyone who used lambdas without types needs to add
>> them now. Consequently, I only see two real options out of the ones you've
>> proposed:
>> 
>> 1) Disambiguate the API (either by removing
>> reduceGroup(GroupReduceFunction) or by renaming it to reduceGroupJ)
>> 2) Maintain a 2.11 and 2.12 master branch until we phase 2.11 completely out
>> 
>> Removing the reduceGroup(GroupReduceFunction) in option 1 is a bit
>> problematic because then all Scala API users who have implemented a
>> GroupReduceFunction need to convert it into a Scala lambda. Moreover, I
>> think it will be problematic with RichGroupReduceFunction which you need to
>> get access to the RuntimeContext.
>> 
>> Maintaining two master branches puts a lot of burden onto the developers to
>> always keep the two branches in sync. Ideally I would like to avoid this.
>> 
>> I also played a little bit around with implicit conversions to add the
>> lambda methods in Scala 2.11 on demand, but I was not able to get it work
>> smoothly.
>> 
>> I'm cross posting this thread to user as well to get some more user
>> feedback.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 4, 2018 at 7:36 PM Elias Levy 
>> wrote:
>> 
>>> The second alternative, with the addition of methods that take functions
>>> with Scala types, seems the most sensible.  I wonder if there is a need
>>> then to maintain the *J Java parameter methods, or whether users could just
>>> access the functionality by converting the Scala DataStreams to Java via
>>> .javaStream and whatever the equivalent is for DataSets.
>>> 
>>> On Thu, Oct 4, 2018 at 8:10 AM Aljoscha Krettek 
>>> wrote:
>>> 
 Hi,
 
 I'm currently working on
>>> https://issues.apache.org/jira/browse/FLINK-7811,
 with the goal of adding support for Scala 2.12. There is a bit of a
>>> hurdle
 and I have to explain some context first.
 
 With Scala 2.12, lambdas are implemented using the lambda mechanism of
 Java 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This
 means that the following two method definitions can both take a lambda:
 
 def map[R](mapper: MapFunction[T, R]): DataSet[R]
 def map[R](fun: T => R): DataSet[R]
 
 The Scala compiler gives precedence to the lambda version when you call
 map() with a lambda in simple cases, so it works here. You could still
>>> call
 map() with a lambda if the lambda version of the method weren't here
 because they are now considered the same. For Scala 2.11 we need both
 signatures, though, to allow calling with a lambda and with a
>>> MapFunction.
 The problem is with more complicated method signatures, like:
 
 def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
 DataSet[R]
 
 def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]
 
 (for reference, GroupReduceFunction is a SAM with void
 reduce(java.lang.Iterable values, Collector out))
 
 These two signatures are not the same but similar enough for the Scala
 2.12 compiler to "get confused". In Scala 2.11, I could call
>>> reduceGroup()
 with a lambda that doesn't have parameter type definitions and things
>>> would
 be fine. With Scala 2.12 I can't do that because the compiler can't
>>> figure
 out which method to call and requires explicit type definitions on the
 lambda parameters.
 
 I see some solutions for this:
 
 1. Keep the methods as is, this would force people to always explicitly
 specify parameter types on their lambdas.
 
 2. Rename the second method to reduceGroupJ() to signal that it takes a
 user function that takes Java-style interfaces (the first parameter is
 java.lang.Iterable while the Scala lambda takes a scala.Iterator). This
 disambiguates the code, users can use lambdas without specifying explicit
 parameter types but breaks the API.
 
 One effect of 2. would be that we can add a reduceGroup() method that
 takes a api.scala.GroupReduceFunction that takes proper Scala types, thus
 it would allow people to implement user functions without having to cast
 the various Iterator/Iterable parameters.
 

Re: [DISCUSS] Dropping flink-storm?

2018-10-08 Thread Till Rohrmann
Good point. The initial idea of this thread was to remove the storm
compatibility layer completely.

During the discussion I realized that it might be useful for our users to
not completely remove it in one go. Instead for those who still want to use
some Bolt and Spout code in Flink, it could be nice to keep the wrappers.
At least, we could remove flink-storm in a more graceful way by first
removing the Topology and client parts and then the wrappers. What do you
think?

Cheers,
Till

On Mon, Oct 8, 2018 at 11:13 AM Chesnay Schepler  wrote:

> I don't believe that to be the consensus. For starters it is
> contradictory; we can't *drop *flink-storm yet still *keep **some parts*.
>
> From my understanding we drop flink-storm completely, and put a note in
> the docs that the bolt/spout wrappers of previous versions will continue to
> work.
>
> On 08.10.2018 11:04, Till Rohrmann wrote:
>
> Thanks for opening the issue Chesnay. I think the overall consensus is to
> drop flink-storm and only keep the Bolt and Spout wrappers. Thanks for your
> feedback!
>
> Cheers,
> Till
>
> On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler 
> wrote:
>
>> I've created https://issues.apache.org/jira/browse/FLINK-10509 for
>> removing flink-storm.
>>
>> On 28.09.2018 15:22, Till Rohrmann wrote:
>> > Hi everyone,
>> >
>> > I would like to discuss how to proceed with Flink's storm compatibility
>> > layer flink-strom.
>> >
>> > While working on removing Flink's legacy mode, I noticed that some
>> parts of
>> > flink-storm rely on the legacy Flink client. In fact, at the moment
>> > flink-storm does not work together with Flink's new distributed
>> > architecture.
>> >
>> > I'm also wondering how many people are actually using Flink's Storm
>> > compatibility layer and whether it would be worth porting it.
>> >
>> > I see two options how to proceed:
>> >
>> > 1) Commit to maintain flink-storm and port it to Flink's new
>> architecture
>> > 2) Drop flink-storm
>> >
>> > I doubt that we can contribute it to Apache Bahir [1], because once we
>> > remove the legacy mode, this module will no longer work with all newer
>> > Flink versions.
>> >
>> > Therefore, I would like to hear your opinion on this and in particular
>> if
>> > you are using or planning to use flink-storm in the future.
>> >
>> > [1] https://github.com/apache/bahir-flink
>> >
>> > Cheers,
>> > Till
>> >
>>
>>
>


Re: [DISCUSS] Dropping flink-storm?

2018-10-08 Thread Chesnay Schepler
I don't believe that to be the consensus. For starters it is 
contradictory; we can't /drop /flink-storm yet still /keep //some parts/.


From my understanding we drop flink-storm completely, and put a note in 
the docs that the bolt/spout wrappers of previous versions will continue 
to work.


On 08.10.2018 11:04, Till Rohrmann wrote:
Thanks for opening the issue Chesnay. I think the overall consensus is 
to drop flink-storm and only keep the Bolt and Spout wrappers. Thanks 
for your feedback!


Cheers,
Till

On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler > wrote:


I've created https://issues.apache.org/jira/browse/FLINK-10509 for
removing flink-storm.

On 28.09.2018 15:22, Till Rohrmann wrote:
> Hi everyone,
>
> I would like to discuss how to proceed with Flink's storm
compatibility
> layer flink-strom.
>
> While working on removing Flink's legacy mode, I noticed that
some parts of
> flink-storm rely on the legacy Flink client. In fact, at the moment
> flink-storm does not work together with Flink's new distributed
> architecture.
>
> I'm also wondering how many people are actually using Flink's Storm
> compatibility layer and whether it would be worth porting it.
>
> I see two options how to proceed:
>
> 1) Commit to maintain flink-storm and port it to Flink's new
architecture
> 2) Drop flink-storm
>
> I doubt that we can contribute it to Apache Bahir [1], because
once we
> remove the legacy mode, this module will no longer work with all
newer
> Flink versions.
>
> Therefore, I would like to hear your opinion on this and in
particular if
> you are using or planning to use flink-storm in the future.
>
> [1] https://github.com/apache/bahir-flink
>
> Cheers,
> Till
>





Re: what's the meaning of latency indicator reported by flink metrics through prometheus?

2018-10-08 Thread Chesnay Schepler

1) correct
2) is the number of measurements; due to the random distribution of 
latency markers this value can be surprisingly low depending on the 
latency marker interval

3) I don't know, but it isn't exposed by Flink.

On 08.10.2018 10:17, varuy322 wrote:

Hi there,

I have integrated kafka with flink1.5, also with the help of prometheus and
Granada to display metrics of flink 1.5,
now i get three indicator about latency as below:
1)flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency(short
for latency)
2)flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency_count(short
for latency_count)
3)flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency_sum(short
for latency_sum)

it seems 1) to be the latency value of one real subtask,
for 2),latency_count always equal to one const, in my example the value is
128,
for 3),latency_sum always equal to zero.

could you tell me what's the meaning of 2) and 3)?

Best regards & Thanks.
Rui, Wang





-
stay hungry, stay foolish.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: [DISCUSS] Dropping flink-storm?

2018-10-08 Thread Till Rohrmann
Thanks for opening the issue Chesnay. I think the overall consensus is to
drop flink-storm and only keep the Bolt and Spout wrappers. Thanks for your
feedback!

Cheers,
Till

On Mon, Oct 8, 2018 at 9:37 AM Chesnay Schepler  wrote:

> I've created https://issues.apache.org/jira/browse/FLINK-10509 for
> removing flink-storm.
>
> On 28.09.2018 15:22, Till Rohrmann wrote:
> > Hi everyone,
> >
> > I would like to discuss how to proceed with Flink's storm compatibility
> > layer flink-strom.
> >
> > While working on removing Flink's legacy mode, I noticed that some parts
> of
> > flink-storm rely on the legacy Flink client. In fact, at the moment
> > flink-storm does not work together with Flink's new distributed
> > architecture.
> >
> > I'm also wondering how many people are actually using Flink's Storm
> > compatibility layer and whether it would be worth porting it.
> >
> > I see two options how to proceed:
> >
> > 1) Commit to maintain flink-storm and port it to Flink's new architecture
> > 2) Drop flink-storm
> >
> > I doubt that we can contribute it to Apache Bahir [1], because once we
> > remove the legacy mode, this module will no longer work with all newer
> > Flink versions.
> >
> > Therefore, I would like to hear your opinion on this and in particular if
> > you are using or planning to use flink-storm in the future.
> >
> > [1] https://github.com/apache/bahir-flink
> >
> > Cheers,
> > Till
> >
>
>


Re: ***UNCHECKED*** Error while confirming Checkpoint

2018-10-08 Thread Stefan Richter
Hi Pedro,

unfortunately the interesting parts are all removed from the log, we already 
know about the exception itself. In particular, what I would like to see is 
what checkpoints have been triggered and completed before the exception happens.

Best,
Stefan

> Am 08.10.2018 um 10:23 schrieb PedroMrChaves :
> 
> Hello,
> 
> Find attached the jobmanager.log. I've omitted the log lines from other
> runs, only left the job manager info and the run with the error. 
> 
> jobmanager.log
> 
>   
> 
> 
> 
> Thanks again for your help.
> 
> Regards,
> Pedro.
> 
> 
> 
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



what's the meaning of latency indicator reported by flink metrics through prometheus?

2018-10-08 Thread varuy322
Hi there,

I have integrated kafka with flink1.5, also with the help of prometheus and
Granada to display metrics of flink 1.5,
now i get three indicator about latency as below:
1)flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency(short
for latency)
2)flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency_count(short
for latency_count)
3)flink_taskmanager_job_latency_source_id_source_subtask_index_operator_id_operator_subtask_index_latency_sum(short
for latency_sum)

it seems 1) to be the latency value of one real subtask, 
for 2),latency_count always equal to one const, in my example the value is
128,
for 3),latency_sum always equal to zero.

could you tell me what's the meaning of 2) and 3)?

Best regards & Thanks.
Rui, Wang


 



-
stay hungry, stay foolish.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2018-10-08 Thread Zhijiang(wangzhijiang999)
There actually exists this deadlock for special scenarios.

Before fixing the bug, we can avoid this issue by not deploying the map and 
sink tasks in the same task manager to work around.
Krishna, do you share the slot for these two tasks? If so, you can set disable 
slot sharing for this job.

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate 
blocking result partition to avoid this issue temporarily.

Best,
Zhijiang


--
发件人:Piotr Nowojski 
发送时间:2018年10月4日(星期四) 21:54
收件人:Aljoscha Krettek 
抄 送:"Narayanaswamy, Krishna" ; Nico Kruber 
; user@flink.apache.org 
主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when 
running a large job > 10k tasks

Hi,

Thanks for reporting the problem. This bug was previously unknown to us. I have 
created a jira ticket for this bug:
https://issues.apache.org/jira/browse/FLINK-10491

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t 
know if there is some hot fix or anything that can at least mitigate/decrease 
the probability of the bug for you until we fix it properly. 

Piotrek

On 4 Oct 2018, at 13:55, Aljoscha Krettek  wrote:
Hi,

this looks like a potential Flink bug. Looping in Nico and Piotr who have 
looked into that in the past. Could you please comment on that?

Best,
Aljoscha

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna  
wrote:

Hi,
I am trying to run one large single job graph which has > 10k tasks. The form 
of the graph is something like
DataSource -> Filter -> Map [...multiple]
Sink1
Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory 
allocation of 32G per TM. The JM is running with 8G.
Everything starts up and runs fine with close to 6-7k tasks (this is variable 
and is mostly the source /filter/map portions) completing and then the graph 
just hangs.  I managed to connect to the task managers and get a thread dump 
just in time and found the following deadlock on one of the TMs which 
apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or 
try out to fix this.
Marked below are the 2 isolated thread stacks marking the deadlock -
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
waiting for monitor entry
 waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
java.util.ArrayDeque)
  at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
  at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
  at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
   - locked <0x2dfd> (a java.util.ArrayDeque)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
  - locked <0x2da5> (a java.lang.Object)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
  at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
  at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
  at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
  at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
  at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
  at java.lang.Thread.run(Thread.java:745)
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
entry
  java.lang.Thread.State: BLOCKED
 blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
 waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release 
lock on <0x2dfd> (a java.util.ArrayDeque)
  at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
  at 

Re: [DISCUSS] Breaking the Scala API for Scala 2.12 Support

2018-10-08 Thread Chesnay Schepler
I'd rather not maintain 2 master branches. Beyond the maintenance 
overhead I'm
wondering about the benefit, as the API break still has to happen at 
some point.


@Aljoscha how much work for supporting scala 2.12 can be merged without 
breaking the API?

If this is the only blocker I suggest to make the breaking change in 1.8.

On 05.10.2018 10:31, Till Rohrmann wrote:

Thanks Aljoscha for starting this discussion. The described problem brings
us indeed a bit into a pickle. Even with option 1) I think it is somewhat
API breaking because everyone who used lambdas without types needs to add
them now. Consequently, I only see two real options out of the ones you've
proposed:

1) Disambiguate the API (either by removing
reduceGroup(GroupReduceFunction) or by renaming it to reduceGroupJ)
2) Maintain a 2.11 and 2.12 master branch until we phase 2.11 completely out

Removing the reduceGroup(GroupReduceFunction) in option 1 is a bit
problematic because then all Scala API users who have implemented a
GroupReduceFunction need to convert it into a Scala lambda. Moreover, I
think it will be problematic with RichGroupReduceFunction which you need to
get access to the RuntimeContext.

Maintaining two master branches puts a lot of burden onto the developers to
always keep the two branches in sync. Ideally I would like to avoid this.

I also played a little bit around with implicit conversions to add the
lambda methods in Scala 2.11 on demand, but I was not able to get it work
smoothly.

I'm cross posting this thread to user as well to get some more user
feedback.

Cheers,
Till

On Thu, Oct 4, 2018 at 7:36 PM Elias Levy 
wrote:


The second alternative, with the addition of methods that take functions
with Scala types, seems the most sensible.  I wonder if there is a need
then to maintain the *J Java parameter methods, or whether users could just
access the functionality by converting the Scala DataStreams to Java via
.javaStream and whatever the equivalent is for DataSets.

On Thu, Oct 4, 2018 at 8:10 AM Aljoscha Krettek 
wrote:


Hi,

I'm currently working on

https://issues.apache.org/jira/browse/FLINK-7811,

with the goal of adding support for Scala 2.12. There is a bit of a

hurdle

and I have to explain some context first.

With Scala 2.12, lambdas are implemented using the lambda mechanism of
Java 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This
means that the following two method definitions can both take a lambda:

def map[R](mapper: MapFunction[T, R]): DataSet[R]
def map[R](fun: T => R): DataSet[R]

The Scala compiler gives precedence to the lambda version when you call
map() with a lambda in simple cases, so it works here. You could still

call

map() with a lambda if the lambda version of the method weren't here
because they are now considered the same. For Scala 2.11 we need both
signatures, though, to allow calling with a lambda and with a

MapFunction.

The problem is with more complicated method signatures, like:

def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
DataSet[R]

def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]

(for reference, GroupReduceFunction is a SAM with void
reduce(java.lang.Iterable values, Collector out))

These two signatures are not the same but similar enough for the Scala
2.12 compiler to "get confused". In Scala 2.11, I could call

reduceGroup()

with a lambda that doesn't have parameter type definitions and things

would

be fine. With Scala 2.12 I can't do that because the compiler can't

figure

out which method to call and requires explicit type definitions on the
lambda parameters.

I see some solutions for this:

1. Keep the methods as is, this would force people to always explicitly
specify parameter types on their lambdas.

2. Rename the second method to reduceGroupJ() to signal that it takes a
user function that takes Java-style interfaces (the first parameter is
java.lang.Iterable while the Scala lambda takes a scala.Iterator). This
disambiguates the code, users can use lambdas without specifying explicit
parameter types but breaks the API.

One effect of 2. would be that we can add a reduceGroup() method that
takes a api.scala.GroupReduceFunction that takes proper Scala types, thus
it would allow people to implement user functions without having to cast
the various Iterator/Iterable parameters.

Either way, people would have to adapt their code when moving to Scala
2.12 in some way, depending on what style of methods they use.

There is also solution 2.5:

2.5 Rename the methods only in the Scala 2.12 build of Flink and keep the
old method names for Scala 2.11. This would require some infrastructure

and

I don't yet know how it can be done in a sane way.

What do you think? I personally would be in favour of 2. but it breaks

the

existing API.

Best,
Aljoscha








Re: Flink Python streaming

2018-10-08 Thread Chesnay Schepler

Hello,

to use libraries you have to supply them when submitting the job as 
described below. Additional directories/files will be placed in the same 
directory as your script on each TM.


See 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#executing-plans


Note that libraries that make use of C-extensions will probably not work.

On 03.10.2018 09:14, Fabian Hueske wrote:

Hi,

AFAIK it's not that easy. Flink's Python support is based on Jython 
which translates Python code into JVM byte code. Therefore, native 
libs are not supported.


Chesnay (in CC) knows the details here.

Best, Fabian

Hequn Cheng mailto:chenghe...@gmail.com>> 
schrieb am Mi., 3. Okt. 2018, 04:30:


Hi Bing,

I'm not familiar with python programming. I guess we can simply
import libraries in the python script. A example can be found here[1].
Hope this helps.

Best, Hequn
[1]

https://github.com/wdm0006/flink-python-examples/blob/master/mandelbrot/mandelbrot_set.py

On Wed, Oct 3, 2018 at 1:49 AM Bing Lin mailto:bingl...@gmail.com>> wrote:

Hi,

I'm wondering how I can add dependencies for third party and
custom libraries to be executed in Flink for python streaming?

Thank you,

Bing





Re: [DISCUSS] Dropping flink-storm?

2018-10-08 Thread Chesnay Schepler
I've created https://issues.apache.org/jira/browse/FLINK-10509 for 
removing flink-storm.


On 28.09.2018 15:22, Till Rohrmann wrote:

Hi everyone,

I would like to discuss how to proceed with Flink's storm compatibility
layer flink-strom.

While working on removing Flink's legacy mode, I noticed that some parts of
flink-storm rely on the legacy Flink client. In fact, at the moment
flink-storm does not work together with Flink's new distributed
architecture.

I'm also wondering how many people are actually using Flink's Storm
compatibility layer and whether it would be worth porting it.

I see two options how to proceed:

1) Commit to maintain flink-storm and port it to Flink's new architecture
2) Drop flink-storm

I doubt that we can contribute it to Apache Bahir [1], because once we
remove the legacy mode, this module will no longer work with all newer
Flink versions.

Therefore, I would like to hear your opinion on this and in particular if
you are using or planning to use flink-storm in the future.

[1] https://github.com/apache/bahir-flink

Cheers,
Till





Re: Duplicates in self join

2018-10-08 Thread Dominik Wosiński
Hey,
IMHO, the simplest way in your case would be to use the Evictor to evict
duplicate values after the window is generated. Have look at it here:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html

Best Regards,
Dominik.

pon., 8 paź 2018 o 08:00 Eric L Goodman 
napisał(a):

> What is the best way to avoid or remove duplicates when joining a stream
> with itself?  I'm performing a streaming temporal triangle computation and
> the first part is to find triads of two edges of the form vertexA->vertexB
> and vertexB->vertexC (and there are temporal constraints where the first
> edge occurs before the second edge).  To do that, I have the following code:
>
> DataStream triads = edges.join(edges)
> .where(new DestKeySelector())
> .equalTo(new SourceKeySelector())
> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
> Time.milliseconds(slideSizeMs)))
> .apply(new EdgeJoiner(queryWindow));
>
> However, when I look at the triads being built, there are two copies of each 
> triad.
>
> For example, if I create ten edges (time, source, target):
>
> 0.0, 4, 0
>
> 0.01, 1, 5
>
> 0.02, 3, 7
>
> 0.03, 0, 8
>
> 0.04, 0, 9
>
> 0.05, 4, 8
>
> 0.06, 4, 3
>
> 0.07, 5, 9
>
> 0.08, 7, 1
>
> 0.09, 9, 6
>
>
> It creates the following triads (time1, source1, target1, time2, source2,
> targe2). Note there are two copies of each.
>
> 0.0, 4, 0 0.03, 0, 8
>
> 0.0, 4, 0 0.03, 0, 8
>
> 0.0, 4, 0 0.04, 0, 9
>
> 0.0, 4, 0 0.04, 0, 9
>
> 0.01, 1, 5 0.07, 5, 9
>
> 0.01, 1, 5 0.07, 5, 9
>
> 0.02, 3, 7 0.08, 7, 1
>
> 0.02, 3, 7 0.08, 7, 1
>
> 0.04, 0, 9 0.09, 9, 6
>
> 0.04, 0, 9 0.09, 9, 6
>
> 0.07, 5, 9 0.09, 9, 6
>
> 0.07, 5, 9 0.09, 9, 6
>
> I'm assuming this behavior has something to do with the joining of "edges" 
> with itself.
>
> I can provide more code if that would be helpful, but I believe I've captured 
> the most salient portion.
>
>
>
>
>
>


Duplicates in self join

2018-10-08 Thread Eric L Goodman
What is the best way to avoid or remove duplicates when joining a stream
with itself?  I'm performing a streaming temporal triangle computation and
the first part is to find triads of two edges of the form vertexA->vertexB
and vertexB->vertexC (and there are temporal constraints where the first
edge occurs before the second edge).  To do that, I have the following code:

DataStream triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));

However, when I look at the triads being built, there are two copies
of each triad.

For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2,
targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of
"edges" with itself.

I can provide more code if that would be helpful, but I believe I've
captured the most salient portion.