Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-13 Thread Renjie Liu
Great!

On Wed, Dec 14, 2016 at 1:30 AM Matthias J. Sax  wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> I think it's worth to announce this via news list. :)
>
> On 12/13/16 7:32 AM, Robert Metzger wrote:
> > The commun...@flink.apache.org 
> > has been created :)
> >
> > On Tue, Dec 13, 2016 at 10:43 AM, Robert Metzger
> > > wrote:
> >
> > +1. I've requested the community@ mailing list from infra.
> >
> > On Tue, Dec 13, 2016 at 10:40 AM, Kostas Tzoumas
> > > wrote:
> >
> > It seems that several folks are excited about the idea - but there
> > is still a concern on whether this would be spam for the dev@ and
> > user@ lists (which I share)
> >
> > As a compromise, I propose to request a new mailing list (
> > commun...@flink.apache.org )
> > which we can use for this purpose, and also to post upcoming
> > meetups, conferences, etc. In order to inform the community about
> > this mailing list, we can cc the dev@ and user@ lists in the first
> > months until the new mailing list has ramped up.
> >
> > On Fri, Dec 9, 2016 at 4:55 PM, Greg Hogan  > > wrote:
> >
> >> Google indexes the mailing list. Anyone can filter the
> > messages to trash
> >> in a few clicks.
> >>
> >> This will also be a means for the community to better
> > understand which and
> >> how companies are using Flink.
> >>
> >> On Fri, Dec 9, 2016 at 8:27 AM, Felix Neutatz
> > >
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I wonder whether a mailing list is a good choice for that in
> > general. If
> >>> I am looking for a job I won't register for a mailing list or
> > browse
> >>> through the archive of one but rather search it via Google.
> > So what about
> >>> putting it on a dedicated site on the Web Page. This feels
> > more intuitive
> >>> to me and gives a better overview.
> >>>
> >>> Best regards, Felix
> >>>
> >>> On Dec 9, 2016 14:20, "Ufuk Celebi"  > > wrote:
> >>>
> >>>
> >>>
> >>>
> >>> On 9 December 2016 at 14:13:14, Robert Metzger
> > (rmetz...@apache.org )
> >>> wrote:
>  I'm against using the news@ list for that. The promise of the
>  news@ list is that its low-traffic and
> > only for
> >>> news. If
>  we now start having job offers (and potentially some
> > questions on them
>  etc.) it'll be a list with more than some announcements.
>  That's also the reason why the news@ list is completely
> > moderated.
> >>>
> >>> I agree with Robert. I would consider that to be spam if
> > posted to news@.
> >>>
> >>>
> >>>
> >>>
> >>
> >
> >
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIYBAEBCgAGBQJYUC/vAAoJELz8Z8hxAGOiMx0P31O4S270PAFJbGkYn6p7zZDH
> ox3q/38DGU4MO00I5oieul+KE3lS00JHzRMqejXNdekDhtqmn5hMZtvRvwYS7kJv
> kmuPrcxWOVtV9PGAR8i/cv7pgx6rDUXV4TpnIlc8XQc3qxSraykggZajN0VJ57NX
> gO7fwsWzyh1lHHdVPI0KamqXKFDZVA+X3SY6Ml+gDJE4q5vvDQi5TXa9C96jn2it
> xyDY4uDz1SnMqdIiSFx+F6Dba9gXjeoc0WGFYpq88u7D5OVwdF3S/sMdoKhcYsC8
> eKKNQgnhAl/K5aYxA3v5EfI1eA/DHpIqgW2VEsJbU553PZ9PR/ZG2pnXVgVE70IH
> 6koHyBc/zlYc0BmOfJMcjpBfkeEJib1emKdpRiWB0RSXy2vM0sbHSMTlmUKSkGCh
> A5Zza3+YbRec+ylcGdu+l0BKjriLa32gsPraWZCVVw+NcBKlA1Qxeqp5jwIyoW1r
> fLjTe8+0DPYQ18Ufijtxa/iedGmBVYONhi1PhpE5cuSVDxBkUiJRqDe/SCCGj1Oi
> 1qDiR3imEaPmHCg6de6lF8MOzSm+CkgAjAXsjKv5kWVoiU6B+DVHQKrwVP/0CN+J
> K/IjTGpqYzXbZnE+Vadofh9YpzwCHU9YadTms0oTrjRNOuHJi8rA0pPF3HaxEWDU
> QRowo2+ah3PF4dA=
> =cf9j
> -END PGP SIGNATURE-
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: checkpoint notifier not found?

2016-12-13 Thread Abhishek R. Singh
Not sure how to go from here. How do I create a PR for this?

$ git branch
* doc-checkpoint-notify
  master


$ git push origin master
remote: Permission to apache/flink.git denied to abhishsi.
fatal: unable to access 'https://github.com/apache/flink.git/': The requested 
URL returned error: 403


$ git diff HEAD^1
diff --git a/docs/dev/state.md b/docs/dev/state.md
index 37de0a8..a753ed1 100644
--- a/docs/dev/state.md
+++ b/docs/dev/state.md
@@ -281,7 +281,7 @@ public static class CounterSource
 }
 {% endhighlight %}
 
-Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `flink.streaming.api.checkpoint.CheckpointNotifier` interface.
+Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
 
 ## State Checkpoints in Iterative Jobs
 


> On Dec 12, 2016, at 3:11 PM, Abhishek R. Singh 
>  wrote:
> 
> https://issues.apache.org/jira/browse/FLINK-5323 
> 
> 
>> On Dec 12, 2016, at 5:37 AM, Till Rohrmann > > wrote:
>> 
>> Hi Abhishek,
>> 
>> great to hear that you like to become part of the Flink community. Here are 
>> some information for how to contribute [1].
>> 
>> [1] http://flink.apache.org/how-to-contribute.html 
>> 
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Dec 12, 2016 at 12:36 PM, Abhishek Singh 
>> > 
>> wrote:
>> Will be happy to. Could you guide me a bit in terms of what I need to do?
>> 
>> I am a newbie to open source contributing. And currently at Frankfurt 
>> airport. When I hit ground will be happy to contribute back. Love the 
>> project !!
>> 
>> Thanks for the awesomeness. 
>> 
>> 
>> On Mon, Dec 12, 2016 at 12:29 PM Stephan Ewen > > wrote:
>> Thanks for reporting this.
>> It would be awesome if you could file a JIRA or a pull request for fixing 
>> the docs for that.
>> 
>> On Sat, Dec 10, 2016 at 2:02 AM, Abhishek R. Singh 
>> > 
>> wrote:
>> I was following the official documentation: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
>>  
>> 
>> 
>> Looks like this is the right one to be using: import 
>> org.apache.flink.runtime.state.CheckpointListener;
>> 
>> -Abhishek-
>> 
>>> On Dec 9, 2016, at 4:30 PM, Abhishek R. Singh 
>>> > 
>>> wrote:
>>> 
>>> I can’t seem to find CheckpointNotifier. Appreciate help !
>>> 
>>> CheckpointNotifier is not a member of package 
>>> org.apache.flink.streaming.api.checkpoint
>>> 
>>> From my pom.xml:
>>> 
>>> 
>>> org.apache.flink
>>> flink-scala_2.11
>>> 1.1.3
>>> 
>>> 
>>> org.apache.flink
>>> flink-streaming-scala_2.11
>>> 1.1.3
>>> 
>>> 
>>> org.apache.flink
>>> flink-clients_2.11
>>> 1.1.3
>>> 
>>> 
>>> org.apache.flink
>>> flink-statebackend-rocksdb_2.11
>>> 1.1.3
>>> 
>> 
>> 
>> 
> 



Re: How to retrieve values from yarn.taskmanager.env in a Job?

2016-12-13 Thread Shannon Carey
Till,

Unfortunately, System.getenv() doesn't contain the expected variable even 
within the UDFs, but thanks for the info!

In the Yarn setting, "the client" would be either:

  1.  the bin/flink executable (with configuration based on where it's run 
from… which might not be the same as the destination Flink cluster) OR
  2.  the web UI… the job planning runs in the existing JVM of the web UI? That 
runs as part of the Job Manager, right? This is the primary method by which we 
launch jobs, currently.

Is that right?

I will try out "env.java.opts" to see if that has any effect.

-Shannon

From: Till Rohrmann >
Date: Tuesday, December 13, 2016 at 4:34 AM
To: >
Cc: Chesnay Schepler >
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hi Shannon,

the job graph generation does not run in the task manager but on the client. 
The job graph is then submitted to the JobManager which then will deploy the 
individual tasks to the TaskManager. Thus, when generating the job graph the 
task manager environment variables are not accessible.

Thus, you are only able to access these environment variables from within your 
UDFs.

What you could do is to union all configuration objects and then reading only 
those entries relevant for a specific environment on the task manager, e.g. 
open method of a RichFunction.

Cheers,
Till

On Mon, Dec 12, 2016 at 7:12 PM, Shannon Carey 
> wrote:
Hi Chesnay,

Since that configuration option is supposed to apply the environment variables 
to the task managers, I figured it would definitely be available within the 
stream operators. I'm not sure whether the job plan runs within a task manager 
or not, but hopefully it does?

In my particular code, I want to get the name of the environment in order to 
read the correct configuration file(s) so that properly populated config 
objects can be passed to various operators. Therefore, it would be sufficient 
for the job plan execution to have access to the environment. All the operators 
are capable of persisting any necessary configuration through serialization.

It really can work either way, but I think it'd be easiest if it was available 
everywhere. If it's only available during job planning then you have to make 
sure to serialize it everywhere you need it, and if it's only available during 
operator execution then it's less straightforward to do central configuration 
work. Either way it's lying in wait for a programmer to forget where it's 
accessible vs. not.

-Shannon

From: Chesnay Schepler >
Date: Monday, December 12, 2016 at 7:36 AM
To: >
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hello,

can you clarify one small thing for me: Do you want to access this parameter 
when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream 
instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This 
should give you a map of string-string key value pairs where the key is the 
environment variable name.

If your values are not set in the returned map, then this indicates a bug in 
Flink and it would be great if you could open a JIRA issue.

Cheers,
Till

​

On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey 
> wrote:
This thread 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/passing-environment-variables-to-flink-program-td3337.html
 describes the impetus for the addition of yarn.taskmanager.env.

I have configured a value within yarn.taskmanager.env, and I see it appearing 
in the Flink web UI in the list underneath Job Manager -> Configuration. 
However, I can't figure out how to retrieve the value from within a Flink job. 
It doesn't appear in the environment, the system properties, or my 
ParameterTool instance, and I can't figure out how I would get to it via the 
StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, 
so that programmers don't have to specify the environment as a job parameter 
every time they run it. I also see that there is a "env.java.opts" 
configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon





PartitionedState and watermark of Window coGroup()

2016-12-13 Thread Sendoh
Hi Flink users,

I'm a bit confused about how these two work when writing trigger for window
coGroup(). 

Stream1.assignTimestampsAndWatermarks(new EventWatermark())
.coGroup(Stream2.assignTimestampsAndWatermarks(new
EventWatermark()))
.where(new JSONKey("key")).equalTo(new JSONKey("key"))
   
.window(TumblingEventTimeWindows.of(Time.days(7))).trigger(new
CoGroupTrigger())
.apply(new CoGroupFunction() {
...
...
}

The documentation describes(
https://github.com/apache/flink/blob/master/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala#L37)
*A streaming co-group operation is evaluated over elements in a window.*

Does it mean elements are not keyed by the joined key? 

Another question is does the watermark take the minimum timestamps of two
streams (or keyed streams, if elements are keyed)? 

Our trigger is going to fire the window when watermark has passed two keyed
streams' timestamps, because one stream is much smaller and consumed much
faster. A similar trigger has been implemented for keyed stream, which means
if we cannot make coGroup() behave as expected, we can still union those two
streams into one stream and use similar solution. 

Best,

Sendoh




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PartitionedState-and-watermark-of-Window-coGroup-tp10620.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-13 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

I think it's worth to announce this via news list. :)

On 12/13/16 7:32 AM, Robert Metzger wrote:
> The commun...@flink.apache.org 
> has been created :)
> 
> On Tue, Dec 13, 2016 at 10:43 AM, Robert Metzger
> > wrote:
> 
> +1. I've requested the community@ mailing list from infra.
> 
> On Tue, Dec 13, 2016 at 10:40 AM, Kostas Tzoumas 
> > wrote:
> 
> It seems that several folks are excited about the idea - but there
> is still a concern on whether this would be spam for the dev@ and
> user@ lists (which I share)
> 
> As a compromise, I propose to request a new mailing list ( 
> commun...@flink.apache.org ) 
> which we can use for this purpose, and also to post upcoming
> meetups, conferences, etc. In order to inform the community about
> this mailing list, we can cc the dev@ and user@ lists in the first 
> months until the new mailing list has ramped up.
> 
> On Fri, Dec 9, 2016 at 4:55 PM, Greg Hogan  > wrote:
> 
>> Google indexes the mailing list. Anyone can filter the
> messages to trash
>> in a few clicks.
>> 
>> This will also be a means for the community to better
> understand which and
>> how companies are using Flink.
>> 
>> On Fri, Dec 9, 2016 at 8:27 AM, Felix Neutatz
> >
>> wrote:
>> 
>>> Hi,
>>> 
>>> I wonder whether a mailing list is a good choice for that in
> general. If
>>> I am looking for a job I won't register for a mailing list or
> browse
>>> through the archive of one but rather search it via Google.
> So what about
>>> putting it on a dedicated site on the Web Page. This feels
> more intuitive
>>> to me and gives a better overview.
>>> 
>>> Best regards, Felix
>>> 
>>> On Dec 9, 2016 14:20, "Ufuk Celebi"  > wrote:
>>> 
>>> 
>>> 
>>> 
>>> On 9 December 2016 at 14:13:14, Robert Metzger
> (rmetz...@apache.org )
>>> wrote:
 I'm against using the news@ list for that. The promise of the
 news@ list is that its low-traffic and
> only for
>>> news. If
 we now start having job offers (and potentially some
> questions on them
 etc.) it'll be a list with more than some announcements. 
 That's also the reason why the news@ list is completely
> moderated.
>>> 
>>> I agree with Robert. I would consider that to be spam if
> posted to news@.
>>> 
>>> 
>>> 
>>> 
>> 
> 
> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIYBAEBCgAGBQJYUC/vAAoJELz8Z8hxAGOiMx0P31O4S270PAFJbGkYn6p7zZDH
ox3q/38DGU4MO00I5oieul+KE3lS00JHzRMqejXNdekDhtqmn5hMZtvRvwYS7kJv
kmuPrcxWOVtV9PGAR8i/cv7pgx6rDUXV4TpnIlc8XQc3qxSraykggZajN0VJ57NX
gO7fwsWzyh1lHHdVPI0KamqXKFDZVA+X3SY6Ml+gDJE4q5vvDQi5TXa9C96jn2it
xyDY4uDz1SnMqdIiSFx+F6Dba9gXjeoc0WGFYpq88u7D5OVwdF3S/sMdoKhcYsC8
eKKNQgnhAl/K5aYxA3v5EfI1eA/DHpIqgW2VEsJbU553PZ9PR/ZG2pnXVgVE70IH
6koHyBc/zlYc0BmOfJMcjpBfkeEJib1emKdpRiWB0RSXy2vM0sbHSMTlmUKSkGCh
A5Zza3+YbRec+ylcGdu+l0BKjriLa32gsPraWZCVVw+NcBKlA1Qxeqp5jwIyoW1r
fLjTe8+0DPYQ18Ufijtxa/iedGmBVYONhi1PhpE5cuSVDxBkUiJRqDe/SCCGj1Oi
1qDiR3imEaPmHCg6de6lF8MOzSm+CkgAjAXsjKv5kWVoiU6B+DVHQKrwVP/0CN+J
K/IjTGpqYzXbZnE+Vadofh9YpzwCHU9YadTms0oTrjRNOuHJi8rA0pPF3HaxEWDU
QRowo2+ah3PF4dA=
=cf9j
-END PGP SIGNATURE-


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-13 Thread Robert Metzger
Exactly.

On Tue, Dec 13, 2016 at 4:40 PM, Timur Shenkao  wrote:

> How to subscribe?
> community-subscr...@flink.apache.org ?
>
> On Tue, Dec 13, 2016 at 6:32 PM, Robert Metzger 
> wrote:
>
>> The commun...@flink.apache.org has been created :)
>>
>> On Tue, Dec 13, 2016 at 10:43 AM, Robert Metzger 
>> wrote:
>>
>>> +1. I've requested the community@ mailing list from infra.
>>>
>>> On Tue, Dec 13, 2016 at 10:40 AM, Kostas Tzoumas 
>>> wrote:
>>>
 It seems that several folks are excited about the idea - but there is
 still
 a concern on whether this would be spam for the dev@ and user@ lists
 (which
 I share)

 As a compromise, I propose to request a new mailing list (
 commun...@flink.apache.org) which we can use for this purpose, and
 also to
 post upcoming meetups, conferences, etc. In order to inform the
 community
 about this mailing list, we can cc the dev@ and user@ lists in the
 first
 months until the new mailing list has ramped up.

 On Fri, Dec 9, 2016 at 4:55 PM, Greg Hogan  wrote:

 > Google indexes the mailing list. Anyone can filter the messages to
 trash
 > in a few clicks.
 >
 > This will also be a means for the community to better understand
 which and
 > how companies are using Flink.
 >
 > On Fri, Dec 9, 2016 at 8:27 AM, Felix Neutatz 
 > wrote:
 >
 >> Hi,
 >>
 >> I wonder whether a mailing list is a good choice for that in
 general. If
 >> I am looking for a job I won't register for a mailing list or browse
 >> through the archive of one but rather search it via Google. So what
 about
 >> putting it on a dedicated site on the Web Page. This feels more
 intuitive
 >> to me and gives a better overview.
 >>
 >> Best regards,
 >> Felix
 >>
 >> On Dec 9, 2016 14:20, "Ufuk Celebi"  wrote:
 >>
 >>
 >>
 >>
 >> On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org)
 >> wrote:
 >> > I'm against using the news@ list for that.
 >> > The promise of the news@ list is that its low-traffic and only for
 >> news. If
 >> > we now start having job offers (and potentially some questions on
 them
 >> > etc.) it'll be a list with more than some announcements.
 >> > That's also the reason why the news@ list is completely moderated.
 >>
 >> I agree with Robert. I would consider that to be spam if posted to
 news@.
 >>
 >>
 >>
 >>
 >

>>>
>>>
>>
>


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-13 Thread Aljoscha Krettek
Hi Yassine,
I managed to reproduce the problem. The cause is that we recently changed
how the timer service is being cleaned up and now the watermark timers are
not firing anymore.

I'll keep you posted and hope to find a solution fast.

Cheers,
Aljoscha

On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI 
wrote:

> Hi Aljoscha,
>
> Please excuse me for the late response; I've been busy for the whole
> previous week.
> I used the custom watermark debugger (with 1.1, I changed 
> super.processWatermark(mark)
> to super.output.emitWatermark(mark)), surprisingly with 1.2, only one
> watremark is printed at the end of the stream with the value WM: Watermark
> @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are
> printed periodically. I am  using the following revision of 1.2-SNAPSHOT :
> https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9
> .
>
> I uploaded the dataset I'm using as an input here :
> https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing
>  ,the first column corresponds to the timestamp.
>
> You can find the code below. Thanks you for your help.
>
> import com.opencsv.CSVParser;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import
> org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> import java.util.*;
>
> /**
>  * Created by ymarzougui on 11/1/2016.
>  */
> public class SortedSessionsAssigner {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream> waterMarked =
> env.readTextFile("file:///E:\\data\\anonymized.csv")
> .flatMap(new RichFlatMapFunction Tuple3>() {
> public CSVParser csvParser;
>
> @Override
> public void open(Configuration config) {
> csvParser = new CSVParser(',', '"');
> }
>
> @Override
> public void flatMap(String in,
> Collector> clctr) throws Exception {
> String[] result = csvParser.parseLine(in);
> clctr.collect(Tuple3.of(Long.parseLong(result[0]),
> result[1], result[2]));
> }
> })
> .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor>() {
> @Override
> public long
> extractAscendingTimestamp(Tuple3 tuple3) {
> return tuple3.f0;
> }
> });
>
> DataStream, Long>> sessions =
> waterMarked
> .keyBy(1)
> .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
> .apply(new
> WindowFunction,Tuple2,
> Long>, Tuple, TimeWindow>() {
>
> @Override
> public void apply(Tuple tuple, TimeWindow timeWindow,
> Iterable> iterable,
> Collector, Long>> collector) throws
> Exception {
> TreeMap treeMap = new
> TreeMap();
> Long session_count = 0L;
> for (Tuple3 tuple3 :
> iterable){
> treeMap.put(tuple3.f2,
> treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
> session_count += 1;
> }
> collector.collect(Tuple2.of(treeMap,
> session_count));
>
> 

Standalone cluster layout

2016-12-13 Thread Avihai Berkovitz
Hi folks,

I am setting up a Flink cluster for testing, and I have a few questions 
regarding memory allocations:

  1.  Is there a recommended limit to the size of a TaskManager heap? I saw 
that Flink uses G1GC, so we can use dozens of GB.
  2.  Following the above question, should I use only one TaskManager process 
per machine, and give it all the available memory (minus a couple of GB for the 
OS)?
  3.  Should I reserve some memory for RocksDB? The partitioned state will be 
around 500GB in size, and to my understanding RocksDB runs in native code and 
so uses off-heap memory.
  4.  What is the recommended heap size of a JobManager? I expect that the 
cluster will run only 2 jobs at the same time.

The planned layout of the standalone cluster is:

  *   3 small JobManager machines, running:
 *   1 process of Zookeeper peer
 *   1 JobManager process
  *   N large TaskManager machines, each running 1 TM process

Thanks!
Avihai



RE: Equivalent of Rx combineLatest() on a join?

2016-12-13 Thread denis.dollfus
Thanks Gábor, indeed it appears to work as expected.

I found another way based on new evictors included in flink 1.2 (see 
FLINK-4174) that can remove elements anywhere in a window, for example based on 
element content.

However the CoFlatMap solution you suggest is definitely simpler, I'm going to 
dig further in this direction.

Regards,

Denis

-Original Message-
From: Gábor Gévay [mailto:gga...@gmail.com] 
Sent: mardi 13 décembre 2016 09:45
To: user@flink.apache.org
Subject: Re: Equivalent of Rx combineLatest() on a join?

Dear Denis,

I think you can do it with a simple CoFlatMapFunction (without windows):
To use a CoFlatMapFunction, you need to first connect [1] your streams, which 
results in a ConnectedStreams. Then you can call flatMap on this, and give a 
CoFlatMapFunction to it (where two different callbacks are executed when an 
element arrives on one of the two streams). What you could do, is to have two 
members in your CoFlatMapFunction that store the latest values from the two 
streams, and you update the appropriate one when an element arrives and also 
emit a combined value from them.

Best,
Gábor

[1] 
https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.1_api_java_org_apache_flink_streaming_api_datastream_DataStream.html-23connect-2Dorg.apache.flink.streaming.api.datastream.DataStream-2D=CwIFaQ=4ZIZThykDLcoWk-GVjSLm9hvvvzvGv0FLoWSRuCSs5Q=Jtkfol_Mg_eQUL17wcicdBE1Et94D7zgzS-8_bKvGlc=ZYuPMV-ZV7UqMGZmLQMxLRdvEMcFaZ_TKhaRe0WuWBA=BUpNqxUD1hBkAYXovGhnJMFhb-27W65ZF-AusAtsmKY=
 




2016-12-05 18:28 GMT+01:00  :
> Actually that doesn’t work as expected because emitted values are not 
> purged. I’ll experiment with purging triggers and/or evictors, though 
> I have the feeling that Flink was not designed for what we need to do 
> here -- but I’ll keep on searching.
>
>
>
> In the meantime any advice is appreciated. If the goal is not clear I 
> can provide more details.
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 16:31
> To: user@flink.apache.org
> Subject: RE: Equivalent of Rx combineLatest() on a join?
>
>
>
> Asking the response helped me to find the answer (yes, rubber duck
> debugging) as it seems that the code below does what I need:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
> .window(GlobalWindow.create())
>
> .trigger(CountTrigger.of(1))
>
> .apply(new JoinFunction);
>
>
>
> If that’s a common use case (in my view it is), a syntax shortcut 
> could help developers, e.g. something like:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
> .combineLatest(new 
> JoinFunction);
>
>
>
> Denis
>
>
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 12:27
> To: user@flink.apache.org
> Subject: Equivalent of Rx combineLatest() on a join?
>
>
>
> Hi all,
>
>
>
> [first email here, I’m new to Flink, Java and Scala, sorry if I missed 
> something obvious]
>
>
>
> I'm exploring Flink in the context of streaming calculators. 
> Basically, the data flow boils down to multiple data streams with 
> variable update rates (ms, seconds, …, month) which are joined before being 
> fed to calculators.
> The kind of operation I need is very similar to the Rx combineLatest 
> operator, which results in a object being emitted whenever one of the 
> streams is updated.
>
>
>
> As there is no such operator predefined, I think I have to use a 
> GlobalWindow and provide a custom WindowAssigner. The end result would 
> look like this (pseudo java 8 code, I hope it's understandable):
>
>
>
> DataStream s1 = env.addSource(..);
>
> DataStream s2 = env.addSource(..);
>
>
>
> S3 = s1.join(s2)
>
> .where(s1 -> id)
>
> .equalTo(s2 -> id)
>
> .window(new MyCustomCombineLatestAssigner())
>
> .apply( … return new object combining data from s1 and 
> from s2);
>
>
>
> Is the approach correct, or is there a simpler way to achieve the same 
> join
> + apply mechanism ?
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
>
>
>
>
> 
>
>
> This e-mail is for the sole use of the intended recipient and contains 
> information that may be privileged and/or confidential. If you are not 
> an intended recipient, please notify the sender by return e-mail and 
> delete this e-mail and any attachments. Certain required legal entity 
> disclosures can be accessed on our website.


Re: Avro Parquet/Flink/Beam

2016-12-13 Thread Jean-Baptiste Onofré

Hi Billy,

no, ParquetIO is in early stage and won't be included in 
0.4.0-incubating (that I will prepare pretty soon).


I will push the branch on my github (didn't have time yet, sorry about 
that).


Regards
JB

On 12/13/2016 05:08 PM, Newport, Billy wrote:

Is your parquetio going to be accepted in to 0.4?

Also, do you have a link to your github?


Thanks

-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Monday, December 12, 2016 11:49 AM
To: user@flink.apache.org
Subject: Re: Avro Parquet/Flink/Beam

Hi Billy,

I will push my branch with ParquetIO on my github.

Yes, the Beam IO is independent from the runner.

Regards
JB

On 12/12/2016 05:29 PM, Newport, Billy wrote:

I don't mind writing one, is there a fork for the ParquetIO works that's 
already been done or is it in trunk?

The ParquetIO is independent of the runner being used? Is that right?

Thanks

-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Monday, December 12, 2016 11:25 AM
To: user@flink.apache.org
Subject: Re: Avro Parquet/Flink/Beam

Hi,

Beam provides a AvroCoder/AvroIO that you can use, but not yet a
ParquetIO (I created a Jira about that and started to work on it).

You can use the Avro reader to populate the PCollection and then use a
custom DoFn to create the Parquet (waiting for the ParquetIO).

Regards
JB

On 12/12/2016 05:19 PM, Newport, Billy wrote:

Are there any examples showing the use of beam with avro/parquet and a
flink runner? I see an avro reader for beam, is it a matter of writing
another one for avro-parquet or does this need to use the flink
HadoopOutputFormat for example?



Thanks

Billy









--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


RE: Avro Parquet/Flink/Beam

2016-12-13 Thread Newport, Billy
Is your parquetio going to be accepted in to 0.4?

Also, do you have a link to your github?


Thanks

-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net] 
Sent: Monday, December 12, 2016 11:49 AM
To: user@flink.apache.org
Subject: Re: Avro Parquet/Flink/Beam

Hi Billy,

I will push my branch with ParquetIO on my github.

Yes, the Beam IO is independent from the runner.

Regards
JB

On 12/12/2016 05:29 PM, Newport, Billy wrote:
> I don't mind writing one, is there a fork for the ParquetIO works that's 
> already been done or is it in trunk?
>
> The ParquetIO is independent of the runner being used? Is that right?
>
> Thanks
>
> -Original Message-
> From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
> Sent: Monday, December 12, 2016 11:25 AM
> To: user@flink.apache.org
> Subject: Re: Avro Parquet/Flink/Beam
>
> Hi,
>
> Beam provides a AvroCoder/AvroIO that you can use, but not yet a
> ParquetIO (I created a Jira about that and started to work on it).
>
> You can use the Avro reader to populate the PCollection and then use a
> custom DoFn to create the Parquet (waiting for the ParquetIO).
>
> Regards
> JB
>
> On 12/12/2016 05:19 PM, Newport, Billy wrote:
>> Are there any examples showing the use of beam with avro/parquet and a
>> flink runner? I see an avro reader for beam, is it a matter of writing
>> another one for avro-parquet or does this need to use the flink
>> HadoopOutputFormat for example?
>>
>>
>>
>> Thanks
>>
>> Billy
>>
>>
>>
>

-- 
Jean-Baptiste Onofré
jbono...@apache.org
https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.nanthrax.net=DgID-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q=EwGuUUxM48zoWoOis4Qf-DWNAER-A45_WBY7OJouJWQ=7-6dzKAcQozOmfL30C0Y44i2mkkAf_Vi5CxKjgWgM5Y=
 
Talend - 
https://urldefense.proofpoint.com/v2/url?u=http-3A__www.talend.com=DgID-g=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q=EwGuUUxM48zoWoOis4Qf-DWNAER-A45_WBY7OJouJWQ=B9Rvx9ad1wvy-Uc01v9S47e48k1uBZooIucUVuiZr2M=
 


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-13 Thread Timur Shenkao
How to subscribe?
community-subscr...@flink.apache.org ?

On Tue, Dec 13, 2016 at 6:32 PM, Robert Metzger  wrote:

> The commun...@flink.apache.org has been created :)
>
> On Tue, Dec 13, 2016 at 10:43 AM, Robert Metzger 
> wrote:
>
>> +1. I've requested the community@ mailing list from infra.
>>
>> On Tue, Dec 13, 2016 at 10:40 AM, Kostas Tzoumas 
>> wrote:
>>
>>> It seems that several folks are excited about the idea - but there is
>>> still
>>> a concern on whether this would be spam for the dev@ and user@ lists
>>> (which
>>> I share)
>>>
>>> As a compromise, I propose to request a new mailing list (
>>> commun...@flink.apache.org) which we can use for this purpose, and also
>>> to
>>> post upcoming meetups, conferences, etc. In order to inform the community
>>> about this mailing list, we can cc the dev@ and user@ lists in the first
>>> months until the new mailing list has ramped up.
>>>
>>> On Fri, Dec 9, 2016 at 4:55 PM, Greg Hogan  wrote:
>>>
>>> > Google indexes the mailing list. Anyone can filter the messages to
>>> trash
>>> > in a few clicks.
>>> >
>>> > This will also be a means for the community to better understand which
>>> and
>>> > how companies are using Flink.
>>> >
>>> > On Fri, Dec 9, 2016 at 8:27 AM, Felix Neutatz 
>>> > wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> I wonder whether a mailing list is a good choice for that in general.
>>> If
>>> >> I am looking for a job I won't register for a mailing list or browse
>>> >> through the archive of one but rather search it via Google. So what
>>> about
>>> >> putting it on a dedicated site on the Web Page. This feels more
>>> intuitive
>>> >> to me and gives a better overview.
>>> >>
>>> >> Best regards,
>>> >> Felix
>>> >>
>>> >> On Dec 9, 2016 14:20, "Ufuk Celebi"  wrote:
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org)
>>> >> wrote:
>>> >> > I'm against using the news@ list for that.
>>> >> > The promise of the news@ list is that its low-traffic and only for
>>> >> news. If
>>> >> > we now start having job offers (and potentially some questions on
>>> them
>>> >> > etc.) it'll be a list with more than some announcements.
>>> >> > That's also the reason why the news@ list is completely moderated.
>>> >>
>>> >> I agree with Robert. I would consider that to be spam if posted to
>>> news@.
>>> >>
>>> >>
>>> >>
>>> >>
>>> >
>>>
>>
>>
>


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-13 Thread Robert Metzger
The commun...@flink.apache.org has been created :)

On Tue, Dec 13, 2016 at 10:43 AM, Robert Metzger 
wrote:

> +1. I've requested the community@ mailing list from infra.
>
> On Tue, Dec 13, 2016 at 10:40 AM, Kostas Tzoumas 
> wrote:
>
>> It seems that several folks are excited about the idea - but there is
>> still
>> a concern on whether this would be spam for the dev@ and user@ lists
>> (which
>> I share)
>>
>> As a compromise, I propose to request a new mailing list (
>> commun...@flink.apache.org) which we can use for this purpose, and also
>> to
>> post upcoming meetups, conferences, etc. In order to inform the community
>> about this mailing list, we can cc the dev@ and user@ lists in the first
>> months until the new mailing list has ramped up.
>>
>> On Fri, Dec 9, 2016 at 4:55 PM, Greg Hogan  wrote:
>>
>> > Google indexes the mailing list. Anyone can filter the messages to trash
>> > in a few clicks.
>> >
>> > This will also be a means for the community to better understand which
>> and
>> > how companies are using Flink.
>> >
>> > On Fri, Dec 9, 2016 at 8:27 AM, Felix Neutatz 
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> I wonder whether a mailing list is a good choice for that in general.
>> If
>> >> I am looking for a job I won't register for a mailing list or browse
>> >> through the archive of one but rather search it via Google. So what
>> about
>> >> putting it on a dedicated site on the Web Page. This feels more
>> intuitive
>> >> to me and gives a better overview.
>> >>
>> >> Best regards,
>> >> Felix
>> >>
>> >> On Dec 9, 2016 14:20, "Ufuk Celebi"  wrote:
>> >>
>> >>
>> >>
>> >>
>> >> On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org)
>> >> wrote:
>> >> > I'm against using the news@ list for that.
>> >> > The promise of the news@ list is that its low-traffic and only for
>> >> news. If
>> >> > we now start having job offers (and potentially some questions on
>> them
>> >> > etc.) it'll be a list with more than some announcements.
>> >> > That's also the reason why the news@ list is completely moderated.
>> >>
>> >> I agree with Robert. I would consider that to be spam if posted to
>> news@.
>> >>
>> >>
>> >>
>> >>
>> >
>>
>
>


WindowFunction-extension, WindowedStream apply signature mismatch

2016-12-13 Thread MIkkel Islay
(The following is a cross-post of a Stack Overflow question at
https://stackoverflow.com/questions/41105142/windowfunction-extension-does-not-match-windowedstream-apply-signatures
)

Why does the below error occur?

Given this extension to WindowFunction:

  class TestWinFunc extends WindowFunction[Top, Bottom, Long, TimeWindow] {
override def apply(key: Long,
   w: TimeWindow,
   iterable: Iterable[Top],
   collector: Collector[Bottom]): Unit = {
  collector.collect(Bottom(0.0,0.0,0.0,0.0,1L))
}
  }

an *apply* transformation on a windowed stream:

val bottom = inputstream
.keyBy(_.stamp)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
.apply(new TestWinFunc)

yields the following error:

Error:overloaded method value apply with alternatives: [R](function: (Long,
org.apache.flink.streaming.api.windowing.windows.TimeWindow,
Iterable[Flink.ETL.Top], org.apache.flink.util.Collector[R]) =>
Unit)(implicit evidence$4:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[R](function:
org.apache.flink.streaming.api.scala.function.WindowFunction[Flink.ETL.Top,R,Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit
evidence$3:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
cannot be applied to (Flink.ETL.TestWinFunc) .apply(new TestWinFunc)

The 'bottom' val argument to *apply* has type WindowedStream[Top, Long,
TimeWindow].

Flink 1.1.3 / Scala 2.11

Thanks,

Mikkel


Re: Bloom filter in Flink

2016-12-13 Thread Fabian Hueske
Hi Gennady,

this bloom filter is actually not distributed and only used internally as
an optimization to reduce the amount of data spilled by a hash join.
So, it is not meant to be user facing and not integrated in any API.
You could of course use the code, but there might be better implementations
for your purpose.

Best, Fabian

2016-12-13 12:34 GMT+01:00 Gennady Gilin :

> Hi Everyone,
>
>
>
> Noticed that Flink sources are contain distributed Bloom filter
> implementation
> ,
> so wandering is somebody tried to use it in production for large scale
> items ( ~2.5 billion items in my case ) and can share experience, or even
> some statistics about errors and memory consumption.
>
>
>
> Thanks,
>
> Gennady
>
>
>
>
>
>
>
>
>


Bloom filter in Flink

2016-12-13 Thread Gennady Gilin
Hi Everyone,

Noticed that Flink sources are contain distributed Bloom filter 
implementation,
 so wandering is somebody tried to use it in production for large scale items ( 
~2.5 billion items in my case ) and can share experience, or even some 
statistics about errors and memory consumption.

Thanks,
Gennady






Re: How to retrieve values from yarn.taskmanager.env in a Job?

2016-12-13 Thread Till Rohrmann
Hi Shannon,

the job graph generation does not run in the task manager but on the
client. The job graph is then submitted to the JobManager which then will
deploy the individual tasks to the TaskManager. Thus, when generating the
job graph the task manager environment variables are not accessible.

Thus, you are only able to access these environment variables from within
your UDFs.

What you could do is to union all configuration objects and then reading
only those entries relevant for a specific environment on the task manager,
e.g. open method of a RichFunction.

Cheers,
Till

On Mon, Dec 12, 2016 at 7:12 PM, Shannon Carey  wrote:

> Hi Chesnay,
>
> Since that configuration option is supposed to apply the environment
> variables to the task managers, I figured it would definitely be available
> within the stream operators. I'm not sure whether the job plan runs within
> a task manager or not, but hopefully it does?
>
> In my particular code, I want to get the name of the environment in order
> to read the correct configuration file(s) so that properly populated config
> objects can be passed to various operators. Therefore, it would be
> sufficient for the job plan execution to have access to the environment.
> All the operators are capable of persisting any necessary configuration
> through serialization.
>
> It really can work either way, but I think it'd be easiest if it was
> available everywhere. If it's only available during job planning then you
> have to make sure to serialize it everywhere you need it, and if it's only
> available during operator execution then it's less straightforward to do
> central configuration work. Either way it's lying in wait for a programmer
> to forget where it's accessible vs. not.
>
> -Shannon
>
> From: Chesnay Schepler 
> Date: Monday, December 12, 2016 at 7:36 AM
> To: 
> Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?
>
> Hello,
>
> can you clarify one small thing for me: Do you want to access this
> parameter when you define the plan
> (aka when you call methods on the StreamExecutionEnvironment or DataStream
> instances)
> or from within your functions/operators?
>
> Regards,
> Chesnay Schepler
>
>
> On 12.12.2016 14:21, Till Rohrmann wrote:
>
> Hi Shannon,
>
> have you tried accessing the environment variables via System.getenv()?
> This should give you a map of string-string key value pairs where the key
> is the environment variable name.
>
> If your values are not set in the returned map, then this indicates a bug
> in Flink and it would be great if you could open a JIRA issue.
>
> Cheers,
> Till
> ​
>
> On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey  wrote:
>
>> This thread http://apache-flink-user-mailing-list-archive.2336050
>> .n4.nabble.com/passing-environment-variables-to-flink-program-td3337.html 
>> describes
>> the impetus for the addition of yarn.taskmanager.env.
>>
>> I have configured a value within yarn.taskmanager.env, and I see it
>> appearing in the Flink web UI in the list underneath Job Manager ->
>> Configuration. However, I can't figure out how to retrieve the value from
>> within a Flink job. It doesn't appear in the environment, the system
>> properties, or my ParameterTool instance, and I can't figure out how I
>> would get to it via the StreamExecutionEnvironment. Can anyone point me in
>> the right direction?
>>
>> All I want to do is inform my Flink jobs which environment they're
>> running on, so that programmers don't have to specify the environment as a
>> job parameter every time they run it. I also see that there is a
>> "env.java.opts" configuration… does that work in YARN apps (would my jobs
>> be able to see it?)
>>
>> Thanks!
>> Shannon
>>
>
>
>


Re: [DISCUSS] "Who's hiring on Flink" monthly thread in the mailing lists?

2016-12-13 Thread Robert Metzger
+1. I've requested the community@ mailing list from infra.

On Tue, Dec 13, 2016 at 10:40 AM, Kostas Tzoumas 
wrote:

> It seems that several folks are excited about the idea - but there is still
> a concern on whether this would be spam for the dev@ and user@ lists
> (which
> I share)
>
> As a compromise, I propose to request a new mailing list (
> commun...@flink.apache.org) which we can use for this purpose, and also to
> post upcoming meetups, conferences, etc. In order to inform the community
> about this mailing list, we can cc the dev@ and user@ lists in the first
> months until the new mailing list has ramped up.
>
> On Fri, Dec 9, 2016 at 4:55 PM, Greg Hogan  wrote:
>
> > Google indexes the mailing list. Anyone can filter the messages to trash
> > in a few clicks.
> >
> > This will also be a means for the community to better understand which
> and
> > how companies are using Flink.
> >
> > On Fri, Dec 9, 2016 at 8:27 AM, Felix Neutatz 
> > wrote:
> >
> >> Hi,
> >>
> >> I wonder whether a mailing list is a good choice for that in general. If
> >> I am looking for a job I won't register for a mailing list or browse
> >> through the archive of one but rather search it via Google. So what
> about
> >> putting it on a dedicated site on the Web Page. This feels more
> intuitive
> >> to me and gives a better overview.
> >>
> >> Best regards,
> >> Felix
> >>
> >> On Dec 9, 2016 14:20, "Ufuk Celebi"  wrote:
> >>
> >>
> >>
> >>
> >> On 9 December 2016 at 14:13:14, Robert Metzger (rmetz...@apache.org)
> >> wrote:
> >> > I'm against using the news@ list for that.
> >> > The promise of the news@ list is that its low-traffic and only for
> >> news. If
> >> > we now start having job offers (and potentially some questions on them
> >> > etc.) it'll be a list with more than some announcements.
> >> > That's also the reason why the news@ list is completely moderated.
> >>
> >> I agree with Robert. I would consider that to be spam if posted to news@
> .
> >>
> >>
> >>
> >>
> >
>


Re: Equivalent of Rx combineLatest() on a join?

2016-12-13 Thread Gábor Gévay
Dear Denis,

I think you can do it with a simple CoFlatMapFunction (without windows):
To use a CoFlatMapFunction, you need to first connect [1] your
streams, which results in a ConnectedStreams. Then you can call
flatMap on this, and give a CoFlatMapFunction to it (where two
different callbacks are executed when an element arrives on one of the
two streams). What you could do, is to have two members in your
CoFlatMapFunction that store the latest values from the two streams,
and you update the appropriate one when an element arrives and also
emit a combined value from them.

Best,
Gábor

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#connect-org.apache.flink.streaming.api.datastream.DataStream-




2016-12-05 18:28 GMT+01:00  :
> Actually that doesn’t work as expected because emitted values are not
> purged. I’ll experiment with purging triggers and/or evictors, though I have
> the feeling that Flink was not designed for what we need to do here -- but
> I’ll keep on searching.
>
>
>
> In the meantime any advice is appreciated. If the goal is not clear I can
> provide more details.
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 16:31
> To: user@flink.apache.org
> Subject: RE: Equivalent of Rx combineLatest() on a join?
>
>
>
> Asking the response helped me to find the answer (yes, rubber duck
> debugging) as it seems that the code below does what I need:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
> .window(GlobalWindow.create())
>
> .trigger(CountTrigger.of(1))
>
> .apply(new JoinFunction);
>
>
>
> If that’s a common use case (in my view it is), a syntax shortcut could help
> developers, e.g. something like:
>
>
>
> s3 = s1.join(s2)
>
> .where(new KeySelector1()).equalTo(new KeySelector2())
>
> .combineLatest(new JoinFunction);
>
>
>
> Denis
>
>
>
>
>
> From: Dollfus, Denis (TR Technology & Ops)
> Sent: lundi 5 décembre 2016 12:27
> To: user@flink.apache.org
> Subject: Equivalent of Rx combineLatest() on a join?
>
>
>
> Hi all,
>
>
>
> [first email here, I’m new to Flink, Java and Scala, sorry if I missed
> something obvious]
>
>
>
> I'm exploring Flink in the context of streaming calculators. Basically, the
> data flow boils down to multiple data streams with variable update rates
> (ms, seconds, …, month) which are joined before being fed to calculators.
> The kind of operation I need is very similar to the Rx combineLatest
> operator, which results in a object being emitted whenever one of the
> streams is updated.
>
>
>
> As there is no such operator predefined, I think I have to use a
> GlobalWindow and provide a custom WindowAssigner. The end result would look
> like this (pseudo java 8 code, I hope it's understandable):
>
>
>
> DataStream s1 = env.addSource(..);
>
> DataStream s2 = env.addSource(..);
>
>
>
> S3 = s1.join(s2)
>
> .where(s1 -> id)
>
> .equalTo(s2 -> id)
>
> .window(new MyCustomCombineLatestAssigner())
>
> .apply( … return new object combining data from s1 and from
> s2);
>
>
>
> Is the approach correct, or is there a simpler way to achieve the same join
> + apply mechanism ?
>
>
>
> Thank you,
>
>
>
> Denis
>
>
>
>
>
>
>
> 
>
>
> This e-mail is for the sole use of the intended recipient and contains
> information that may be privileged and/or confidential. If you are not an
> intended recipient, please notify the sender by return e-mail and delete
> this e-mail and any attachments. Certain required legal entity disclosures
> can be accessed on our website.