org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl

2017-03-23 Thread rimin515
hi,i read file from hdfs,but there is error when run jon on yarn 
clutster,---val dataSeg = 
env.readTextFile("hdfs:///user/hadoop/text").filter(!_.startsWith("#")).map { x 
=> 
   val values = x.split("\t")
   (values.apply(0),values.apply(1).split(" "))
}

logger.info("dataSeg="+dataSeg.count())
the error is 
following:--2017-03-24
 11:32:15,012 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queuejava.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)-hadoop
 is 2.6flink is 1.1.0-hadoop2.6-scala-2.11(the 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl is in 
flink-shaded-hadoop2-1.1.0 )


Re: Question Regarding a sink..

2017-03-23 Thread Tzu-Li (Gordon) Tai
Hi Steve,

This normally shouldn’t happen, unless there simply is two copies of the data.

What is the source of the topology? Also, this might be obvious, but if you 
have broadcasted your input stream to the sink, then each sink instance would 
then get all records in the input stream.

Cheers,
Gordon

On March 24, 2017 at 9:11:35 AM, Steve Jerman (st...@kloudspot.com) wrote:

Hi,

I have a sink writing data to InfluxDB. I’ve noticed that the sink gets 
multiple copies of upstream records..

Why does this happen, and how can I avoid it… ?

Below is a trace …showing 2 records (I have a parallelism of two) for each 
record in the ‘.printToError’ for the same stream.

Any help/suggestions appreciated.

Steve


1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=8F:13:AC:4A:DA:93, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=43:7D:8A:D4:7D:D7, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=77:CD:BD:48:EE:D8, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 

Question Regarding a sink..

2017-03-23 Thread Steve Jerman
Hi,

I have a sink writing data to InfluxDB. I’ve noticed that the sink gets 
multiple copies of upstream records..

Why does this happen, and how can I avoid it… ?

Below is a trace …showing 2 records (I have a parallelism of two) for each 
record in the ‘.printToError’ for the same stream.

Any help/suggestions appreciated.

Steve


1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=8F:13:AC:4A:DA:93, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=43:7D:8A:D4:7D:D7, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=77:CD:BD:48:EE:D8, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
18:41:27,352 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, 

Re: deploying flink cluster in AWS - Containerized

2017-03-23 Thread Chakravarthy varaga
I'm looking forward to hearing some updates on this...

Any help here is highly appreciated !!

On Thu, Mar 23, 2017 at 4:20 PM, Chakravarthy varaga <
chakravarth...@gmail.com> wrote:

> Hi Team,
>
>  We are doing a PoC to deploy Flink cluster on AWS. All runtime
> components will be dockerized.
>
>   I have few questions in relation to discover & security:
>
>   1. How does Job Manager discover task managers? Do they talk to over
> TCP ?
>
>   2. If the runtime components TM, JM are containerized how are the
> IPs resolved dynamically? Basically do I have to configure the JM with the
> hostnames of the TMs. If so, if the TMs are on ephemeral IPs and on restart
> of TM how does the job manager know the TM's (IP/Host). Before I go into
> DNS and subnets, I'd like to understand how they disvoer & talk to each
> other !
>
>3. I went through some Flink materials on the web on security
> precisely on kerebros. However how do I ensure that user level
> authentication is applied on job management. For ex., only certain users
> are allowed to start/stop jobs ? This question is in relation to if flink
> is deployed as standalone-cluster
>
> Thanks & Regards
> CVP
>


Re: flink-1.2 and unit testing / flinkspector

2017-03-23 Thread Tarandeep Singh
Hi Nancy,

I also get 1 test failed when I build/run tests on flink-spector:

- should stop if all triggers fire
Run completed in 3 seconds, 944 milliseconds.
Total number of tests run: 19
Suites: completed 5, aborted 0
Tests: succeeded 18, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

I didn't spend time on it and proceeded to build the project by ignoring
the failed test.

Thanks,
Tarandeep


On Thu, Mar 23, 2017 at 11:16 AM, Ted Yu  wrote:

> Nancy:
> You can start another thread for the failed unit tests.
>
> You can pass "-DskipTests" to get over the install command.
>
> Cheers
>
> On Thu, Mar 23, 2017 at 11:06 AM, Nancy Estrada  > wrote:
>
>> Hi Tarandeep and Ted,
>>
>> I am in this route now. I am trying to use Flinkspector with Flink 1.2
>> using
>> your instructions but failing miserably. After applying the changes, when
>> I
>> try to run "mvn clean install", some Tests fail and therefore I am not
>> able
>> to build successfully.
>>
>> I am wondering if there is a pull request, where I can access to a version
>> of Flinkspector working with Flink 1.2.
>>
>> Thank you,
>> Nancy
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/flink-1-2-and-unit-
>> testing-flinkspector-tp12281p12377.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: flink-1.2 and unit testing / flinkspector

2017-03-23 Thread Ted Yu
Nancy:
You can start another thread for the failed unit tests.

You can pass "-DskipTests" to get over the install command.

Cheers

On Thu, Mar 23, 2017 at 11:06 AM, Nancy Estrada 
wrote:

> Hi Tarandeep and Ted,
>
> I am in this route now. I am trying to use Flinkspector with Flink 1.2
> using
> your instructions but failing miserably. After applying the changes, when I
> try to run "mvn clean install", some Tests fail and therefore I am not able
> to build successfully.
>
> I am wondering if there is a pull request, where I can access to a version
> of Flinkspector working with Flink 1.2.
>
> Thank you,
> Nancy
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/flink-1-2-and-
> unit-testing-flinkspector-tp12281p12377.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: flink-1.2 and unit testing / flinkspector

2017-03-23 Thread Nancy Estrada
Hi Tarandeep and Ted,

I am in this route now. I am trying to use Flinkspector with Flink 1.2 using
your instructions but failing miserably. After applying the changes, when I
try to run "mvn clean install", some Tests fail and therefore I am not able
to build successfully. 

I am wondering if there is a pull request, where I can access to a version
of Flinkspector working with Flink 1.2.

Thank you,
Nancy



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/flink-1-2-and-unit-testing-flinkspector-tp12281p12377.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Alternatives to FLINK_CONF_DIR, HADOOP_CONF_DIR, YARN_CONF_DIR

2017-03-23 Thread Foster, Craig
Thanks! I looked in the config.sh file that sort of works with both the 
configuration file and these environment variables. After inspection, it 
doesn’t make sense to set FLINK_CONF_DIR in that config file since that 
location determines where we would look for that file.

However, I thought that I could add a patch to this shell script to at least 
allow someone to set the Hadoop and Yarn configuration directories in the 
config file.

From: "Bajaj, Abhinav" 
Date: Thursday, March 23, 2017 at 10:42 AM
To: "Foster, Craig" 
Cc: "user@flink.apache.org" 
Subject: Re: Alternatives to FLINK_CONF_DIR, HADOOP_CONF_DIR, YARN_CONF_DIR

I think the FLINK_CONF_DIR points to the conf directory. This is the place 
where the Flink CLI looks for the flink-conf.yaml file.

I think there is an alternate option for HADOOP_CONF_DIR, YARN_CONF_DIR but I 
am not sure.
Check this 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#common-options
Config option - “fs.hdfs.hadoopconf:“

~ Abhi


From: "Foster, Craig" 
Reply-To: "user@flink.apache.org" 
Date: Thursday, March 23, 2017 at 9:43 AM
To: "user@flink.apache.org" 
Subject: Alternatives to FLINK_CONF_DIR, HADOOP_CONF_DIR, YARN_CONF_DIR

Can I set these in the configuration file? This would be ideal vs. environment 
variables for me but I’m not seeing it in the documentation.

Thanks,
Craig



Re: Alternatives to FLINK_CONF_DIR, HADOOP_CONF_DIR, YARN_CONF_DIR

2017-03-23 Thread Bajaj, Abhinav
I think the FLINK_CONF_DIR points to the conf directory. This is the place 
where the Flink CLI looks for the flink-conf.yaml file.

I think there is an alternate option for HADOOP_CONF_DIR, YARN_CONF_DIR but I 
am not sure.
Check this 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#common-options
Config option - “fs.hdfs.hadoopconf:“

~ Abhi


From: "Foster, Craig" 
Reply-To: "user@flink.apache.org" 
Date: Thursday, March 23, 2017 at 9:43 AM
To: "user@flink.apache.org" 
Subject: Alternatives to FLINK_CONF_DIR, HADOOP_CONF_DIR, YARN_CONF_DIR

Can I set these in the configuration file? This would be ideal vs. environment 
variables for me but I’m not seeing it in the documentation.

Thanks,
Craig



Re: Cogrouped Stream never triggers tumbling event time window

2017-03-23 Thread Andrea Spina
Sorry, I forgot to put the Flink version. 1.1.2

Thanks, Andrea





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12374.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Cogrouped Stream never triggers tumbling event time window

2017-03-23 Thread Andrea Spina
Dear Community,

I'm really struggling on a co-grouped stream. The workload is the following:

*
val firstStream: DataStream[FirstType] =
  firstRaw.assignTimestampsAndWatermarks(new
MyCustomFirstExtractor(maxOutOfOrder))

val secondStream: DataStream[SecondType] = secondRaw
  .assignTimestampsAndWatermarks(new
MyCustomSecondExtractor(maxOutOfOrder))
  .map(new toSecondsStreamMapper())
*

where both the Extractors extend BoundedOutOfOrdernessTimestampExtractor by
overriding the extractTimestamp method and assigning timestamps owned
respectively by FirstType and SecondType objects.

*override def extractTimestamp(first: FirstType): Long = first.timestamp*

Then I'm calling cogroup as follows

*

val stockDetails = firstStream
  .coGroup(secondStream)
  .where(_.id)
  .equalTo(_.id)
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .apply(new MyCogroupFunction())
  .uid("myCogroup")
  .name("My CoGroup")

*

The problem is the CoGroup function is never triggered. I did several tests
and I was not able to solve it at all. 
The first relevant point is that event time can be seriously out-of-order. I
can even bump into 0 timestamp. Then I faked also timestamps in order to
distribute them in a set of two seconds, five seconds, so forth. These tries
didn't change at all the behavior: no one window is raised.

Another relevant is: I'm running locally by reading from a pre-loaded kafka
topic, then all the events are ridden sequentially at startup.

I will give a couple example

Workload 1 (faked timestamps)
fields (id, timestamp)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131328)
FirstType(9781783433803 ,1490280131328)

SecondType(9781783433803,1490280130465)
SecondType(9781783433803,1490280131027)
SecondType(9781783433803,1490280131051)
SecondType(9781783433803,1490280131070)
SecondType(9781783433803,1490280131085)
SecondType(9781783433803,1490280131103)
SecondType(9781783433803,1490280131124)
SecondType(9781783433803,1490280131143)
SecondType(9781783433803,1490280131158)
SecondType(9781783433803,1490280131175)

Workload 2 (real case timestamps)

> FirstType(9781783433803, 1490172958602)
1> FirstType(9781783433803, ,1490172958611)
1> FirstType(9781783433803, 1490172958611)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803 ,1490196171869)
1> FirstType(9781783433803, 1490196171869)

SecondType(9781783433803 ,0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 1489689399726)
SecondType(9781783433803, 1489689399726)

I confirm that I have healthy incoming streams at the entrance of the
coGroup operator.
I think I'm likely missing something easy.

Any help will be really appreciated.

Sincerly,

Andrea




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Alternatives to FLINK_CONF_DIR, HADOOP_CONF_DIR, YARN_CONF_DIR

2017-03-23 Thread Foster, Craig
Can I set these in the configuration file? This would be ideal vs. environment 
variables for me but I’m not seeing it in the documentation.

Thanks,
Craig



deploying flink cluster in AWS - Containerized

2017-03-23 Thread Chakravarthy varaga
Hi Team,

 We are doing a PoC to deploy Flink cluster on AWS. All runtime
components will be dockerized.

  I have few questions in relation to discover & security:

  1. How does Job Manager discover task managers? Do they talk to over
TCP ?

  2. If the runtime components TM, JM are containerized how are the IPs
resolved dynamically? Basically do I have to configure the JM with the
hostnames of the TMs. If so, if the TMs are on ephemeral IPs and on restart
of TM how does the job manager know the TM's (IP/Host). Before I go into
DNS and subnets, I'd like to understand how they disvoer & talk to each
other !

   3. I went through some Flink materials on the web on security
precisely on kerebros. However how do I ensure that user level
authentication is applied on job management. For ex., only certain users
are allowed to start/stop jobs ? This question is in relation to if flink
is deployed as standalone-cluster

Thanks & Regards
CVP


Re: [E] Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-23 Thread Greg Hogan
A PR and review may have noted that the same regex is in 
stop-zookeeper-quorum.sh and recommended ignoring whitespace both before and 
after the equals sign.


> On Mar 23, 2017, at 10:12 AM, Robert Metzger  wrote:
> 
> Thank you for verifying. Fixed in master: 
> http://git-wip-us.apache.org/repos/asf/flink/commit/3e860b40 
> 
> 
> On Wed, Mar 22, 2017 at 9:25 PM,  > wrote:
> That worked.. Thanks Chesnay,
> 
>  
> 
> 
>  
> 
> Kanagaraj Vengidasamy
> RTCI
> 
> 7701 E Telecom PKWY
> Temple Terrace, FL 33637
> 
> O 813.978.4372 | M 813.455.9757
> 
>     
>    
> 
>  
> 
> From: Chesnay Schepler [mailto:ches...@apache.org 
> ] 
> Sent: Wednesday, March 22, 2017 11:23 AM
> To: user@flink.apache.org 
> Subject: Re: [E] Re: unable to add more servers in zookeeper quorum peers in 
> flink 1.2
> 
>  
> 
> I guess that's because the grouping is wrong.
> 
> ^server\.([0-9])+[[:space:]]*\=([^: \#]+)
> should probably be
> 
> ^server\.([0-9]+)[[:space:]]*\=([^: \#]+)
> Could you modify the .sh script as such and try again?
> 
> Regards,
> Chesnay
> 
> On 22.03.2017 16:10, kanagaraj.vengidas...@verizon.com 
>  wrote:
> 
> Greg,
> 
>  
> 
> Sorry about that.. when I copy the config and replaced the real server names 
> . I don’t have space in my configurations. The  issue is when I have 
> server.10=, server.11=
> 
> And tried to start the zookeeper,  Myid replaced with 0 and 1 for these 
> servers which is conflicting with server.0 and server.1
> 
>  
> 
>  
> 
> Thanks
> 
> 
>  
> 
> Kanagaraj Vengidasamy
> RTCI
> 
> 7701 E Telecom PKWY
> Temple Terrace, FL 33637
> 
> O 813.978.4372 | M 813.455.9757
> 
>  
> 
>
> 
>
> 
>  
> 
> From: Greg Hogan [mailto:c...@greghogan.com ] 
> Sent: Wednesday, March 22, 2017 10:08 AM
> To: user@flink.apache.org 
> Subject: [E] Re: unable to add more servers in zookeeper quorum peers in 
> flink 1.2
> 
>  
> 
> Kanagaraj,
> 
>  
> 
> None of the server lines are matching since the regex in 
> start-zookeeper-quorum.sh does not allow for spaces after the equals sign.
> 
>   ^server\.([0-9])+[[:space:]]*\=([^: \#]+)
> 
>  
> 
> Greg
> 
>  
> 
> On Mar 22, 2017, at 8:49 AM, kanagaraj.vengidas...@verizon.com 
>  wrote:
> 
>  
> 
> Hi All,
> 
>  
> 
> We are using flink 1.2  .  Unable to add more than one digit in server.x.  
> when I have more than one digit it is not allowing to start the zookeeper.  
> What I need to do if I want to keep more servers?
> 
>  
> 
> # ZooKeeper quorum peers
> 
> server.0=server1:2888:3888
> 
> server.1=server2:2888:3888
> 
> server.2=server3:2888:3888
> 
> server.3=server4:2888:3888
> 
> server.4=server5:2888:3888
> 
> server.5=server6:2888:3888
> 
> server.6=server7:2888:3888
> 
> server.7=server8:2888:3888
> 
> server.8=server9:2888:3888
> 
> server.9=server10:2888:3888
> 
> #server.10=server11:2888:3888
> 
> #server.11=server12:2888:3888
> 
>  
> 
> Thanks
> 
> Kanagaraj
> 
>  
> 
>  
> 
> 



Re: Benchmarking streaming frameworks

2017-03-23 Thread Dominik Safaric
Dear Giselle,

Various stream processing engines benchmarks already exist. Here are only a few 
of them I believe are worthwhile mentioning:

http://ieeexplore.ieee.org/document/7530084/ 

https://www.usenix.org/node/188989 
https://pdfs.semanticscholar.org/c82f/170fbc837291d94dc0a18f0223d182144339.pdf 

https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F14/projects/reports/project11_report_ver3.pdf
 

https://hal.inria.fr/hal-01347638/document 


Regards,
Dominik

> On 23 Mar 2017, at 11:09, Giselle van Dongen  
> wrote:
> 
> Dear users of Streaming Technologies,
> 
> As a PhD student in big data analytics, I am currently in the process of
> compiling a list of benchmarks (to test multiple streaming frameworks) in
> order to create an expanded benchmarking suite. The benchmark suite is being
> developed as a part of my current work at Ghent University.
> 
> The included frameworks at this time are, in no particular order, Spark,
> Flink, Kafka (Streams), Storm (Trident) and Drizzle. Any pointers to
> previous work or relevant benchmarks would be appreciated.
> 
> Best regards,
> Giselle van Dongen



keyBy doesn't evenly distribute keys

2017-03-23 Thread Dongwon Kim
Hi all,

I want keyBy() to evenly distribute records over operator subtasks
especially for a small number of keys.

I execute a test code (see below if interested) with varying numbers of
keys while setting parallelism to 5.
The key assignment to subtasks is as follows:
- 5 keys over 5 subtasks   : each subtask with 3, 1, 1, 0, 0 keys,
respectively
- 10 keys over 5 subtasks : each subtask with 4, 3, 1, 1, 1 keys,
respectively
- 20 keys over 5 subtasks : each subtask with 6, 5, 4, 3, 2, respectively
- 30 keys over 5 subtasks : each subtask with 8, 7, 6, 6, 3, respectively
- 40 keys over 5 subtasks : each subtask with 11, 10, 8, 3, 3, respectively
- 50 keys over 5 subtasks : each subtask with 13, 11, 10, 9 ,7, respectively
I repeated the test for each setting, and found that the key assignment is
deterministic when # keys and # subtasks are fixed.

I manage to do that with customPartition().
But what I really want is to get a keyed stream and to apply a window
function to a sliding window (not shown in the below code though)

I found that Stephan once suggested to generate a special key to be used by
keyBy() as shown in http://apache-flink-user-mailing-list-archive.
2336050.n4.nabble.com/keyBy-using-custom-partitioner-td5379.html,
but I cannot find any example about it.
How can I generate a special key in order to evenly distribute keys to
operator subtasks?

Otherwise, is there another way of evenly distributing keys?

- Dongwon Kim

 test code -
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.collection.mutable

case class Data(time: Int)

object FlinkTest extends App {
  val parallelism = 5
  val numberOfKeys = 5 // 5, 10, 20, 30, 40, 50
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(parallelism)

  env
.addSource(
  new RichSourceFunction[(Int, Data)] {
var running = false
override def open(parameters: Configuration): Unit = { running =
true }
override def run(ctx: SourceContext[(Int, Data)]): Unit = {
  val iter = (1 to 1).iterator
  while (running && iter.hasNext) {
val t = iter.next
(1 to numberOfKeys) foreach { key =>
  ctx.collect((key, new Data(t)))
}
  }
  running = false
}
override def cancel(): Unit = { running = false }
  }
)
.keyBy(0)
//.partitionCustom(
//  new Partitioner[Int](){
//override def partition(key: Int, numPartitions: Int): Int =
key % numPartitions
//  },
//  _._1
//)
.addSink(
new RichSinkFunction[(Int, Data)] {

  var counts: mutable.HashMap[Int, Int] = _

  override def open(parameters: Configuration): Unit = {
counts = new mutable.HashMap()
  }

  override def invoke(record: (Int, Data)) = {
val key = record._1
val cnt = counts.getOrElseUpdate(key, 0)
counts.update(key, cnt+1)
  }

  override def close(): Unit = {
println(s"close : ${counts.size} $counts")
  }
}
  )
  env.execute()
}


Re: Task manager number mismatch container number on mesos

2017-03-23 Thread Robert Metzger
Could you provide the logs of the task manager that still runs as a
container but doesn't show up as a Taskmanager?

On Thu, Mar 23, 2017 at 11:38 AM, Renjie Liu 
wrote:

> Permanent. I've waited for several minutes and the task manager is still
> lost.
>
> On Thu, Mar 23, 2017 at 6:34 PM Ufuk Celebi  wrote:
>
>> When it happens, is it temporary or permanent?
>>
>> Looping in Till and Eron who worked on the Mesos runner.
>>
>> – Ufuk
>>
>> On Thu, Mar 23, 2017 at 11:09 AM, Renjie Liu 
>> wrote:
>> > Hi, all:
>> > We are using flink 1.2.0 on mesos. We found the number of task managers
>> > mismatches with container number occasinally. That's the mesos container
>> > still exists but it can't be found on the monitor web page of flink
>> master.
>> > This case doesn't happen frequently and it's hard to reproduce.
>> > --
>> > Liu, Renjie
>> > Software Engineer, MVAD
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Task manager number mismatch container number on mesos

2017-03-23 Thread Renjie Liu
I'm not sure how to reproduce this bug, and I'll post it next time it
happens.

On Thu, Mar 23, 2017 at 10:21 PM Robert Metzger  wrote:

> Could you provide the logs of the task manager that still runs as a
> container but doesn't show up as a Taskmanager?
>
> On Thu, Mar 23, 2017 at 11:38 AM, Renjie Liu 
> wrote:
>
> Permanent. I've waited for several minutes and the task manager is still
> lost.
>
> On Thu, Mar 23, 2017 at 6:34 PM Ufuk Celebi  wrote:
>
> When it happens, is it temporary or permanent?
>
> Looping in Till and Eron who worked on the Mesos runner.
>
> – Ufuk
>
> On Thu, Mar 23, 2017 at 11:09 AM, Renjie Liu 
> wrote:
> > Hi, all:
> > We are using flink 1.2.0 on mesos. We found the number of task managers
> > mismatches with container number occasinally. That's the mesos container
> > still exists but it can't be found on the monitor web page of flink
> master.
> > This case doesn't happen frequently and it's hard to reproduce.
> > --
> > Liu, Renjie
> > Software Engineer, MVAD
>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: Odd error

2017-03-23 Thread Robert Metzger
Hi,

I assume the flatMap(new RecordSplit()) is emitting a RawRecord.
Is it possible that you've also added an empty constructor to it while
adding the compareTo() method?

I think the problem is that one of your types (probably the schema) is
recognized as a nested POJO.
Check out this documentation:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/api_concepts.html#define-keys-using-field-expressions


On Thu, Mar 23, 2017 at 4:35 AM, Telco Phone  wrote:

>
> Getting this:
>
> DataStream stream =
> env.addSource(new FlinkKafkaConsumer08<>("raw", schema,
> properties)
> ).setParallelism(30).flatMap(new
> RecordSplit()).setParallelism(30).
> name("Raw splitter").keyBy("id","keyByHe
> lper","schema");
>
> Field expression must be equal to '*' or '_' for non-composite types.
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.<
> init>(Keys.java:342)
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(
> DataStream.java:273)
> com.company.ingest.stream.RawRecord.main(RawRecord.java:38)
>
> I did add a new long compare
>
> @Override
> public int compareTo(SchemaRecord o) {
> return Long.compare(this.keyByHelper, o.keyByHelper);
>
>
> I can't seem to get by this error...
>
>
>
>


Re: [E] Re: unable to add more servers in zookeeper quorum peers in flink 1.2

2017-03-23 Thread Robert Metzger
Thank you for verifying. Fixed in master:
http://git-wip-us.apache.org/repos/asf/flink/commit/3e860b40

On Wed, Mar 22, 2017 at 9:25 PM,  wrote:

> That worked.. Thanks Chesnay,
>
>
>
>
> [image: Verizon] 
>
> Kanagaraj Vengidasamy
> RTCI
>
> 7701 E Telecom PKWY
> Temple Terrace, FL 33637
>
> O 813.978.4372 | M 813.455.9757
>
> [image: Facebook]   [image: Twitter]
>   [image: LinkedIn]
> 
>
>
>
> *From:* Chesnay Schepler [mailto:ches...@apache.org]
> *Sent:* Wednesday, March 22, 2017 11:23 AM
> *To:* user@flink.apache.org
> *Subject:* Re: [E] Re: unable to add more servers in zookeeper quorum
> peers in flink 1.2
>
>
>
> I guess that's because the grouping is wrong.
>
> ^server\.*([0-9])+*[[:space:]]*\=([^: \#]+)
>
> should probably be
>
> ^server\.*([0-9]+)*[[:space:]]*\=([^: \#]+)
>
> Could you modify the .sh script as such and try again?
>
> Regards,
> Chesnay
>
> On 22.03.2017 16:10, kanagaraj.vengidas...@verizon.com wrote:
>
> Greg,
>
>
>
> Sorry about that.. when I copy the config and replaced the real server
> names . I don’t have space in my configurations. The  issue is when I have 
> server.10=,
> server.11=
>
> And tried to start the zookeeper,  Myid replaced with 0 and 1 for these
> servers which is conflicting with server.0 and server.1
>
>
>
>
>
> Thanks
>
>
> [image: Verizon] 
>
> Kanagaraj Vengidasamy
> RTCI
>
> 7701 E Telecom PKWY
> Temple Terrace, FL 33637
>
> O 813.978.4372 | M 813.455.9757
>
> [image: Facebook]
> 
>   [image: Twitter]
> 
>   [image: LinkedIn]
> 
>
>
>
> *From:* Greg Hogan [mailto:c...@greghogan.com ]
> *Sent:* Wednesday, March 22, 2017 10:08 AM
> *To:* user@flink.apache.org
> *Subject:* [E] Re: unable to add more servers in zookeeper quorum peers
> in flink 1.2
>
>
>
> Kanagaraj,
>
>
>
> None of the server lines are matching since the regex in
> start-zookeeper-quorum.sh does not allow for spaces after the equals sign.
>
>   ^server\.([0-9])+[[:space:]]*\=([^: \#]+)
>
>
>
> Greg
>
>
>
> On Mar 22, 2017, at 8:49 AM, kanagaraj.vengidas...@verizon.com wrote:
>
>
>
> Hi All,
>
>
>
> We are using flink 1.2  .  Unable to add more than one digit in server.x.
> when I have more than one digit it is not allowing to start the zookeeper.
> What I need to do if I want to keep more servers?
>
>
>
> # ZooKeeper quorum peers
>
> server.0=server1:2888:3888
>
> server.1=server2:2888:3888
>
> server.2=server3:2888:3888
>
> server.3=server4:2888:3888
>
> server.4=server5:2888:3888
>
> server.5=server6:2888:3888
>
> server.6=server7:2888:3888
>
> server.7=server8:2888:3888
>
> server.8=server9:2888:3888
>
> server.9=server10:2888:3888
>
> #server.10=server11:2888:3888
>
> #server.11=server12:2888:3888
>
>
>
> Thanks
>
> Kanagaraj
>
>
>
>
>


Re: Flink AUR package is available

2017-03-23 Thread Robert Metzger
Amazing, thanks a lot!

On Thu, Mar 23, 2017 at 10:36 AM, Tao Meng  wrote:

> Hi all,
>
>   For arch linux users, I have created flink AUR package.
> We can use the package manager to install flink and use the systemd
> manager flink as service.
> If you have any questions or suggestions please feel free to contact me.
>
>
> $ yaourt -S apache-flink
> $ systemctl start apache-flink-jobmanager.service
> $ systemctl start apache-flink-taskmanager@1.service
>
>
>  Thanks.
>


flink Broadcast

2017-03-23 Thread rimin515
Hi ,alll,
i have a 36000 documents,and the document all transfer a vector , one doc is a 
vector,and dimension is the same,so have DataSet

val data :DataSet[(String,SparseVector)]= //36000 record
val toData = data.collect()
val docSims = data.map{x=>
 val fromId=x._1
 val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
  val score =1- cosDisticnce(x._2,y._2)
 (y._1,score)
 }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
   (fromId,docsims)
}
docSims.writeAsText(file)
.
when run the job on yarn,it will get error ,the message is following:
   java.lang.InterruptedException  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)


someone can tell me ?thank you

Re: RocksDB segfaults

2017-03-23 Thread Robert Metzger
Florian, can you post the log of the Taskmanager where the segfault
happened ?

On Wed, Mar 22, 2017 at 6:19 PM, Stefan Richter  wrote:

> Hi,
>
> for the first checkpoint, from the stacktrace I assume that the backend is
> not accessed as part of processing an element, but by another thread. Is
> that correct? RocksDB requires accessing threads to hold the task’s
> checkpointing lock, otherwise they might call methods on an instance that
> is already disposed. However, this should only happen when the task was
> already about to shutdown anyways. Is that a plausible explanation for your
> observed behaviour? I can also not rule out that segfaults can happen
> inside RocksDB or due to the JNI bridge.
>
> Best,
> Stefan
>
> > Am 22.03.2017 um 16:53 schrieb Florian König  >:
> >
> > Hi Stephen,
> >
> > you are right, the second stack trace is indeed from a run of Flink
> 1.1.4. Sorry, my bad.
> >
> > That leaves us with the first trace of a segfault for which I can
> guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot
> reproduce the problem. It has happened twice so far, but I can’t see any
> pattern. Is there anything in the stack trace that could point us to a
> probable cause?
> >
> > Florian
> >
> >> Am 22.03.2017 um 16:00 schrieb Stephan Ewen :
> >>
> >> Hi!
> >>
> >> It looks like you are running the RocksDB state backend 1.1 (is still
> an old version packaged into your JAR file?)
> >>
> >> This line indicates that: org.apache.flink.contrib.streaming.state.
> RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist
> in 1.2 any more)
> >>
> >> Can you try and run 1.2 and see if that still occurs? In general, I
> cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was
> stable.
> >>
> >> Stephan
> >>
> >>
> >>
> >> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <
> florian.koe...@micardo.com> wrote:
> >> Hi,
> >>
> >> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by
> something in RocksDB. What is the preferred way to report them? All I got
> at the moment are two hs_err_pid12345.log files. They are over 4000 lines
> long each. Is there anything significant that I should extract to help you
> guys and/or put into a JIRA ticket?
> >>
> >> The first thing that came to my mind was the stack traces (see below).
> Anything else?
> >>
> >> Thanks
> >> Florian
> >>
> >> 
> >>
> >> Stack: [0x7fec04341000,0x7fec04442000],
> sp=0x7fec0443ff48,  free space=1019k
> >> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> >> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @
> 0x7fec925887cc [0x7fec92588780+0x4c]
> >> J 27241 C2 org.apache.flink.contrib.streaming.state.
> RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @
> 0x7fec94010ca4 [0x7fec940109c0+0x2e4]
> >> j  com.micardo.backend.TransitionProcessor$2.
> getValue()Ljava/lang/Long;+7
> >> j  com.micardo.backend.TransitionProcessor$2.
> getValue()Ljava/lang/Object;+1
> >> J 38483 C2 org.apache.flink.runtime.metrics.dump.
> MetricDumpSerialization.serializeGauge(Ljava/io/
> DataOutput;Lorg/apache/flink/runtime/metrics/dump/
> QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114
> bytes) @ 0x7fec918eabf0 [0x7fec918eabc0+0x30]
> >> J 38522 C2 org.apache.flink.runtime.metrics.dump.
> MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;
> Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/
> flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult;
> (471 bytes) @ 0x7fec94eb6260 [0x7fec94eb57a0+0xac0]
> >> J 47531 C2 org.apache.flink.runtime.metrics.dump.
> MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @
> 0x7fec95ca57a0 [0x7fec95ca4da0+0xa00]
> >> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/
> PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x7fec91e3ae6c
> [0x7fec91e3adc0+0xac]
> >> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104
> bytes) @ 0x7fec91d5bc44 [0x7fec91d5b9a0+0x2a4]
> >> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @
> 0x7fec9212d050 [0x7fec9212ccc0+0x390]
> >> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182
> bytes) @ 0x7fec923f8170 [0x7fec923f7fc0+0x1b0]
> >> v  ~StubRoutines::call_stub
> >>
> >> --
> >>
> >> Stack: [0x7f167a5b7000,0x7f167a6b8000],
> sp=0x7f167a6b5f40,  free space=1019k
> >> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code,
> C=native code)
> >> C  [libstdc++.so.6+0xc026b]  std::basic_string std::char_traits, std::allocator >::basic_string(std::string
> const&)+0xb
> >> C  [librocksdbjni8426686507832168508.so+0x2f14ca]
> rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions
> 

Re: Threading issue

2017-03-23 Thread Robert Metzger
Hi,

how many unique combinations of your key
"partition","threadNumber","schemaId" exist?
In my opinion, all sinks should receive data if there are enough different
keys.

On Wed, Mar 22, 2017 at 3:41 AM, Telco Phone  wrote:

> I am looking to get  readers from kafka / keyBy and Sink working with all
> 60 threads.
>
> For the most part it is working correctly
>
> DataStream stream =
> env.addSource(new FlinkKafkaConsumer08<>("kafkatopic", schema, properties)
> ).setParallelism(60).flatMap(new SchemaRecordSplit()).setParallelism(60).
> name("RawAdActivity splitter").keyBy("partition","
> threadNumber","schemaId");
>
> stream.addSink(new CustomMaprFsSink()).setParallelism(60).name("RawAdActivity
> Sink");
>
> However I only get about 24-30 sinks writing data
>
>
> Now the kafka payload I am reading is based on time / schema so to help
> out I put in a random number generator and group by it as well so that it
> will "try" to force 60 sinks receiving data and writing to HDFS.
>
> Any thoughts with the above code that I can somehow "help" it make sure
> that during the hour for the most part I should have all 60 reading /
> sorting / Sinking (writing to file system)
>
>


Re: [POLL] Who still uses Java 7 with Flink ?

2017-03-23 Thread Theodore Vasiloudis
Hello all,

I'm sure you've considered this already, but what this data does not
include is all the potential future users,
i.e. slower moving organizations (banks etc.) which could be on Java 7
still.

Whether those are relevant is up for debate.

Cheers,
Theo

On Thu, Mar 23, 2017 at 12:14 PM, Robert Metzger 
wrote:

> Yeah, you are right :)
> I'll put something in my calendar for end of May.
>
> On Thu, Mar 23, 2017 at 12:12 PM, Greg Hogan  wrote:
>
>> Robert,
>>
>> Thanks for the report. Shouldn’t we be revisiting this decision at the
>> beginning of the new release cycle rather than near the end? There is
>> currently little cost to staying with Java 7 since no Flink code or pull
>> requests have been written for Java 8.
>>
>> Greg
>>
>>
>>
>> On Mar 23, 2017, at 6:37 AM, Robert Metzger  wrote:
>>
>> Looks like 9% on twitter and 24% on the mailing list are still using Java
>> 7.
>>
>> I would vote to keep supporting Java 7 for Flink 1.3 and then revisit
>> once we are approaching 1.4 in September.
>>
>> On Thu, Mar 16, 2017 at 8:00 AM, Bowen Li 
>> wrote:
>>
>>> There's always a tradeoff we need to make. I'm in favor of upgrading to
>>> Java 8 to bring in all new Java features.
>>>
>>> The common way I've seen (and I agree) other software upgrading major
>>> things like this is 1) upgrade for next big release without backward
>>> compatibility and notify everyone 2) maintain and patch current, old-tech
>>> compatible version at a reasonably limited scope. Building backward
>>> compatibility is too much for an open sourced project
>>>
>>>
>>>
>>> On Wed, Mar 15, 2017 at 7:10 AM, Robert Metzger 
>>> wrote:
>>>
 I've put it also on our Twitter account:
 https://twitter.com/ApacheFlink/status/842015062667755521

 On Wed, Mar 15, 2017 at 2:19 PM, Martin Neumann 
 wrote:

 > I think this easier done in a straw poll than in an email
 conversation.
 > I created one at: http://www.strawpoll.me/12535073
 > (Note that you have multiple choices.)
 >
 >
 > Though I prefer Java 8 most of the time I have to work on Java 7. A
 lot of
 > the infrastructure I work on still runs Java 7, one of the companies I
 > build a prototype for a while back just updated to Java 7 2 years
 ago. I
 > doubt we can ditch Java 7 support any time soon if we want to make it
 easy
 > for companies to use Flink.
 >
 > cheers Martin
 >
 > //PS sorry if this gets sent twice, we just migrated to a new mail
 system
 > and a lot of things are broken
 >
 > 
 > From: Stephan Ewen 
 > Sent: Wednesday, March 15, 2017 12:30:24 PM
 > To: user@flink.apache.org; d...@flink.apache.org
 > Subject: [POLL] Who still uses Java 7 with Flink ?
 >
 > Hi all!
 >
 > I would like to get a feeling how much Java 7 is still being used
 among
 > Flink users.
 >
 > At some point, it would be great to drop Java 7 support and make use
 of
 > Java 8's new features, but first we would need to get a feeling how
 much
 > Java 7 is still used.
 >
 > Would be happy if users on Java 7 respond here, or even users that
 have
 > some insights into how widespread they think Java 7 still is.
 >
 > Thanks,
 > Stephan
 >
 >
 >
 >
 >

>>>
>>>
>>
>>
>


Re: Benchmarking streaming frameworks

2017-03-23 Thread Michael Noll
A recent one is "Analytics on Fast Data: Main-Memory Database Systems
versus Modern Streaming Systems" (
http://db.in.tum.de/~kipf/papers/fastdata.pdf)

For the record, the paper above doesn't yet cover/realize that, nowadays,
the Kafka project includes native stream processing capabilities aka the
Kafka Streams API.

-Michael


On Thu, Mar 23, 2017 at 2:00 PM, Felix Neutatz 
wrote:

> Hi,
>
> our team already created a benchmark framework for batch processing
> (including MR,Yarn, Spark, Flink), maybe you like to extend it for
> streaming: https://github.com/peelframework/peel
>
> Best regards,
> Felix
>
>
> On Mar 23, 2017 11:51, "Christophe Salperwyck" <
> christophe.salperw...@gmail.com> wrote:
>
> Good idea! You could test Akka streams too.
>
> Lots of documents exist:
> https://yahooeng.tumblr.com/post/135321837876/benchmarking-s
> treaming-computation-engines-at
> https://github.com/yahoo/streaming-benchmarks
>
> Cheers,
> Christophe
>
> 2017-03-23 11:09 GMT+01:00 Giselle van Dongen 
> :
>
>> Dear users of Streaming Technologies,
>>
>> As a PhD student in big data analytics, I am currently in the process of
>> compiling a list of benchmarks (to test multiple streaming frameworks) in
>> order to create an expanded benchmarking suite. The benchmark suite is
>> being
>> developed as a part of my current work at Ghent University.
>>
>> The included frameworks at this time are, in no particular order, Spark,
>> Flink, Kafka (Streams), Storm (Trident) and Drizzle. Any pointers to
>> previous work or relevant benchmarks would be appreciated.
>>
>> Best regards,
>> Giselle van Dongen
>>
>
>
>


Re: Benchmarking streaming frameworks

2017-03-23 Thread Felix Neutatz
Hi,

our team already created a benchmark framework for batch processing
(including MR,Yarn, Spark, Flink), maybe you like to extend it for
streaming: https://github.com/peelframework/peel

Best regards,
Felix

On Mar 23, 2017 11:51, "Christophe Salperwyck"  wrote:

Good idea! You could test Akka streams too.

Lots of documents exist:
https://yahooeng.tumblr.com/post/135321837876/benchmarking-
streaming-computation-engines-at
https://github.com/yahoo/streaming-benchmarks

Cheers,
Christophe

2017-03-23 11:09 GMT+01:00 Giselle van Dongen :

> Dear users of Streaming Technologies,
>
> As a PhD student in big data analytics, I am currently in the process of
> compiling a list of benchmarks (to test multiple streaming frameworks) in
> order to create an expanded benchmarking suite. The benchmark suite is
> being
> developed as a part of my current work at Ghent University.
>
> The included frameworks at this time are, in no particular order, Spark,
> Flink, Kafka (Streams), Storm (Trident) and Drizzle. Any pointers to
> previous work or relevant benchmarks would be appreciated.
>
> Best regards,
> Giselle van Dongen
>


Re: Windows emit results at the end of the stream

2017-03-23 Thread Sonex
Thank you for your response Yassine,

I forgot to mention that I use the Scala API. In Scala the equivalent code
is:

val inputFormat = new TextInputFormat(new Path("file/to/read.txt"))
env.readFile(inputFormat,"file/to/read.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,1L)

Am I correct?

But I noticed a weird behavior now. Sometimes, it never starts to process
the elements of the file and sometimes it stops at the middle of the file
without processing the rest of it. Why does that happen?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12356.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: A question about iterations and prioritizing "control" over "data" inputs

2017-03-23 Thread Paris Carbone
Unless I got this wrong, if he meant relaxing FIFO processing per 
channel/stream partition then Robert is absolutely right.

On 23 Mar 2017, at 12:28, Paris Carbone > 
wrote:

I think what Theo meant is to allow for different: high/low priority on 
different channels (or data streams per se) for n-ary operators such as 
ConnectedStream binary maps, loops etc.. not to change the sequence of events 
within channels I guess.

This does not violate the FIFO channel assumptions of the checkpointing 
algorithm. The checkpoint barriers anyway block committed stream partitions so 
there is no priority concern there.

On 23 Mar 2017, at 12:13, Robert Metzger 
> wrote:

To very quickly respond to Theo's question: No, it is not possible to have 
records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the case 
when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp 
> wrote:
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the issues
> that has come
> up in other frameworks is the ability to prioritize "control" over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in
> parallel, performs
> an aggregation to update the model, and then broadcasts the updated model to
> back through
> an iteration/back edge. Using the above nomenclature the events being
> ingested would be
> "data" events, and the model update would a "control" event.
>
> I talked about this scenario a bit with couple of people (Paris and
> Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of control
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of events
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that queue.
>
> If our data source is fast, and the model updates slow, a lot of data events
> might be buffered/scheduled
> to be processed before each model update, because of the speed difference
> between the two
> streams. But we would like to update the model that is used to process data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked at how
> such a pipeline would
> be implemented in Flink.





Re: A question about iterations and prioritizing "control" over "data" inputs

2017-03-23 Thread Paris Carbone
I think what Theo meant is to allow for different: high/low priority on 
different channels (or data streams per se) for n-ary operators such as 
ConnectedStream binary maps, loops etc.. not to change the sequence of events 
within channels I guess.

This does not violate the FIFO channel assumptions of the checkpointing 
algorithm. The checkpoint barriers anyway block committed stream partitions so 
there is no priority concern there.

On 23 Mar 2017, at 12:13, Robert Metzger 
> wrote:

To very quickly respond to Theo's question: No, it is not possible to have 
records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the case 
when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp 
> wrote:
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the issues
> that has come
> up in other frameworks is the ability to prioritize "control" over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in
> parallel, performs
> an aggregation to update the model, and then broadcasts the updated model to
> back through
> an iteration/back edge. Using the above nomenclature the events being
> ingested would be
> "data" events, and the model update would a "control" event.
>
> I talked about this scenario a bit with couple of people (Paris and
> Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of control
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of events
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that queue.
>
> If our data source is fast, and the model updates slow, a lot of data events
> might be buffered/scheduled
> to be processed before each model update, because of the speed difference
> between the two
> streams. But we would like to update the model that is used to process data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked at how
> such a pipeline would
> be implemented in Flink.




Re: [POLL] Who still uses Java 7 with Flink ?

2017-03-23 Thread Robert Metzger
Yeah, you are right :)
I'll put something in my calendar for end of May.

On Thu, Mar 23, 2017 at 12:12 PM, Greg Hogan  wrote:

> Robert,
>
> Thanks for the report. Shouldn’t we be revisiting this decision at the
> beginning of the new release cycle rather than near the end? There is
> currently little cost to staying with Java 7 since no Flink code or pull
> requests have been written for Java 8.
>
> Greg
>
>
>
> On Mar 23, 2017, at 6:37 AM, Robert Metzger  wrote:
>
> Looks like 9% on twitter and 24% on the mailing list are still using Java
> 7.
>
> I would vote to keep supporting Java 7 for Flink 1.3 and then revisit once
> we are approaching 1.4 in September.
>
> On Thu, Mar 16, 2017 at 8:00 AM, Bowen Li  wrote:
>
>> There's always a tradeoff we need to make. I'm in favor of upgrading to
>> Java 8 to bring in all new Java features.
>>
>> The common way I've seen (and I agree) other software upgrading major
>> things like this is 1) upgrade for next big release without backward
>> compatibility and notify everyone 2) maintain and patch current, old-tech
>> compatible version at a reasonably limited scope. Building backward
>> compatibility is too much for an open sourced project
>>
>>
>>
>> On Wed, Mar 15, 2017 at 7:10 AM, Robert Metzger 
>> wrote:
>>
>>> I've put it also on our Twitter account:
>>> https://twitter.com/ApacheFlink/status/842015062667755521
>>>
>>> On Wed, Mar 15, 2017 at 2:19 PM, Martin Neumann 
>>> wrote:
>>>
>>> > I think this easier done in a straw poll than in an email conversation.
>>> > I created one at: http://www.strawpoll.me/12535073
>>> > (Note that you have multiple choices.)
>>> >
>>> >
>>> > Though I prefer Java 8 most of the time I have to work on Java 7. A
>>> lot of
>>> > the infrastructure I work on still runs Java 7, one of the companies I
>>> > build a prototype for a while back just updated to Java 7 2 years ago.
>>> I
>>> > doubt we can ditch Java 7 support any time soon if we want to make it
>>> easy
>>> > for companies to use Flink.
>>> >
>>> > cheers Martin
>>> >
>>> > //PS sorry if this gets sent twice, we just migrated to a new mail
>>> system
>>> > and a lot of things are broken
>>> >
>>> > 
>>> > From: Stephan Ewen 
>>> > Sent: Wednesday, March 15, 2017 12:30:24 PM
>>> > To: user@flink.apache.org; d...@flink.apache.org
>>> > Subject: [POLL] Who still uses Java 7 with Flink ?
>>> >
>>> > Hi all!
>>> >
>>> > I would like to get a feeling how much Java 7 is still being used among
>>> > Flink users.
>>> >
>>> > At some point, it would be great to drop Java 7 support and make use of
>>> > Java 8's new features, but first we would need to get a feeling how
>>> much
>>> > Java 7 is still used.
>>> >
>>> > Would be happy if users on Java 7 respond here, or even users that have
>>> > some insights into how widespread they think Java 7 still is.
>>> >
>>> > Thanks,
>>> > Stephan
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>
>>
>
>


Re: A question about iterations and prioritizing "control" over "data" inputs

2017-03-23 Thread Robert Metzger
To very quickly respond to Theo's question: No, it is not possible to have
records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the
case when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp  wrote:

> Hi,
>
> I have a similar sounding use case and just yesterday was
> experimenting with this approach:
>
> Use 2 separate streams: one for model events, one for data events.
> Connect these 2, key the resulting stream and then use a
> RichCoFlatMapFunction to ensure that each data event is enriched with
> the latest model event as soon as a new model event arrives.
> Also, as soon as a new model arrives emit all previously seen events
> with this new model events.
> This involves keeping events and models in state.
> My emitted enriched events have a command-like syntax (add/remove) so
> that downstream operators can remove/add as necessary depending on the
> calculations (so for each model change I would emit an add/remove pair
> of enriched events).
>
> As I say I have only experimented with this yesterday, perhaps someone
> a bit more experienced with flink might spot some problems with this
> approach, which I would definitely be interested in hearing.
>
> Kat
>
> On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
>  wrote:
> > Hello all,
> >
> > I've started thinking about online learning in Flink and one of the
> issues
> > that has come
> > up in other frameworks is the ability to prioritize "control" over "data"
> > events in iterations.
> >
> > To set an example, say we develop an ML model, that ingests events in
> > parallel, performs
> > an aggregation to update the model, and then broadcasts the updated
> model to
> > back through
> > an iteration/back edge. Using the above nomenclature the events being
> > ingested would be
> > "data" events, and the model update would a "control" event.
> >
> > I talked about this scenario a bit with couple of people (Paris and
> > Gianmarco) and one thing
> > we would like to have is the ability to prioritize the ingestion of
> control
> > events over the data events.
> >
> > If my understanding is correct, currently there is a buffer/queue of
> events
> > waiting to be processed
> > for each operator, and each incoming event ends up at the end of that
> queue.
> >
> > If our data source is fast, and the model updates slow, a lot of data
> events
> > might be buffered/scheduled
> > to be processed before each model update, because of the speed difference
> > between the two
> > streams. But we would like to update the model that is used to process
> data
> > events as soon as
> > the newest version becomes available.
> >
> > Is it somehow possible to make the control events "jump" the queue and be
> > processed as soon
> > as they arrive over the data events?
> >
> > Regards,
> > Theodore
> >
> > P.S. This is still very much a theoretical problem, I haven't looked at
> how
> > such a pipeline would
> > be implemented in Flink.
>


Re: [POLL] Who still uses Java 7 with Flink ?

2017-03-23 Thread Greg Hogan
Robert,

Thanks for the report. Shouldn’t we be revisiting this decision at the 
beginning of the new release cycle rather than near the end? There is currently 
little cost to staying with Java 7 since no Flink code or pull requests have 
been written for Java 8.

Greg



> On Mar 23, 2017, at 6:37 AM, Robert Metzger  wrote:
> 
> Looks like 9% on twitter and 24% on the mailing list are still using Java 7.
> 
> I would vote to keep supporting Java 7 for Flink 1.3 and then revisit once we 
> are approaching 1.4 in September.
> 
> On Thu, Mar 16, 2017 at 8:00 AM, Bowen Li  > wrote:
> There's always a tradeoff we need to make. I'm in favor of upgrading to Java 
> 8 to bring in all new Java features.
> 
> The common way I've seen (and I agree) other software upgrading major things 
> like this is 1) upgrade for next big release without backward compatibility 
> and notify everyone 2) maintain and patch current, old-tech compatible 
> version at a reasonably limited scope. Building backward compatibility is too 
> much for an open sourced project
> 
> 
> 
> On Wed, Mar 15, 2017 at 7:10 AM, Robert Metzger  > wrote:
> I've put it also on our Twitter account:
> https://twitter.com/ApacheFlink/status/842015062667755521 
> 
> 
> On Wed, Mar 15, 2017 at 2:19 PM, Martin Neumann  >
> wrote:
> 
> > I think this easier done in a straw poll than in an email conversation.
> > I created one at: http://www.strawpoll.me/12535073 
> > 
> > (Note that you have multiple choices.)
> >
> >
> > Though I prefer Java 8 most of the time I have to work on Java 7. A lot of
> > the infrastructure I work on still runs Java 7, one of the companies I
> > build a prototype for a while back just updated to Java 7 2 years ago. I
> > doubt we can ditch Java 7 support any time soon if we want to make it easy
> > for companies to use Flink.
> >
> > cheers Martin
> >
> > //PS sorry if this gets sent twice, we just migrated to a new mail system
> > and a lot of things are broken
> >
> > 
> > From: Stephan Ewen >
> > Sent: Wednesday, March 15, 2017 12:30:24 PM
> > To: user@flink.apache.org ; 
> > d...@flink.apache.org 
> > Subject: [POLL] Who still uses Java 7 with Flink ?
> >
> > Hi all!
> >
> > I would like to get a feeling how much Java 7 is still being used among
> > Flink users.
> >
> > At some point, it would be great to drop Java 7 support and make use of
> > Java 8's new features, but first we would need to get a feeling how much
> > Java 7 is still used.
> >
> > Would be happy if users on Java 7 respond here, or even users that have
> > some insights into how widespread they think Java 7 still is.
> >
> > Thanks,
> > Stephan
> >
> >
> >
> >
> >
> 
> 



Re: Telling if a job has caught up with Kafka

2017-03-23 Thread Robert Metzger
Sorry for joining this discussion late, but there is already a metric for
the offset lag in our 0.9+ consumers.
Its called the "records-lag-max":
https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring and
its exposed via Flink's metrics system.
The only issue is that it only shows the maximum lag across all partitions,
not detailed per-partition metrics.

On Mon, Mar 20, 2017 at 3:43 PM, Bruno Aranda  wrote:

> Hi,
>
> Thanks! The proposal sounds very good to us too.
>
> Bruno
>
> On Sun, 19 Mar 2017 at 10:57 Florian König 
> wrote:
>
>> Thanks Gordon for the detailed explanation! That makes sense and explains
>> the expected behaviour.
>>
>> The JIRA for the new metric also sounds very good. Can’t wait to have
>> this in the Flink GUI (KafkaOffsetMonitor has some problems and stops
>> working after 1-2 days, don’t know the reason yet).
>>
>> All the best,
>> Florian
>>
>>
>> > Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai > >:
>> >
>> > @Florian
>> > the 0.9 / 0.10 version and 0.8 version behave a bit differently right
>> now for the offset committing.
>> >
>> > In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable”
>> etc. settings will be completely ignored and overwritten before used to
>> instantiate the interval Kafka clients, hence committing will only happen
>> on Flink checkpoints.
>> >
>> > In 0.8, this isn’t the case. Both automatic periodic committing and
>> committing on checkpoints can take place. That’s perhaps why you’re
>> observing the 0.8 consumer to be committing more frequently.
>> >
>> > FYI: This behaviour will be unified in Flink 1.3.0. If you’re
>> interested, you can take a look at https://github.com/apache/
>> flink/pull/3527.
>> >
>> > - Gordon
>> >
>> >
>> > On March 17, 2017 at 6:07:38 PM, Florian König (
>> florian.koe...@micardo.com) wrote:
>> >
>> >> Why is that so? The checkpoint contains the Kafka offset and would be
>> able to start reading wherever it left off, regardless of any offset stored
>> in Kafka or Zookeeper. Why is the offset not committed regularly,
>> independently from the checkpointing? Or did I misconfigure anything?
>>
>>
>>


Re: Benchmarking streaming frameworks

2017-03-23 Thread Christophe Salperwyck
Good idea! You could test Akka streams too.

Lots of documents exist:
https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
https://github.com/yahoo/streaming-benchmarks

Cheers,
Christophe

2017-03-23 11:09 GMT+01:00 Giselle van Dongen :

> Dear users of Streaming Technologies,
>
> As a PhD student in big data analytics, I am currently in the process of
> compiling a list of benchmarks (to test multiple streaming frameworks) in
> order to create an expanded benchmarking suite. The benchmark suite is
> being
> developed as a part of my current work at Ghent University.
>
> The included frameworks at this time are, in no particular order, Spark,
> Flink, Kafka (Streams), Storm (Trident) and Drizzle. Any pointers to
> previous work or relevant benchmarks would be appreciated.
>
> Best regards,
> Giselle van Dongen
>


Re: Windows emit results at the end of the stream

2017-03-23 Thread Yassine MARZOUGUI
Hi Sonex,

When using readTextFile(...) with event time, only one watermark with the
value Long.MAX_VALUE is sent at the end of the stream, which explais why
the windows are stored until the whole file is processed. In order to have
periodic watermarks, you need to process the file continuousely as folows:

TextInputFormat inputFormat = new TextInputFormat(new Path("file/to/read.txt"));
env.readFile(inputFormat,"file/to/read.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,1L,
TypeInformation.of(String.class))
   .map(...)

Hope this helps.

Best,
Yassine

2017-03-23 9:47 GMT+01:00 Sonex :

> Hi everyone,
>
> I am using a simple window computation on a stream with event time. The
> code
> looks like this:
>
> streamData.readTextFile(...)
> .map(...)
> .assignAscendingTimestamps(_.timestamp)
> .keyBy(_.id)
> .timeWindow(Time.seconds(3600),Time.seconds(3600))
> .apply(new MyWindowFunction)
> .map(...)
>
> By monitoring the memory usage and the flink web dashboard, I noticed that
> flink applies the window function until the entire stream finishes (thus
> storing all aggregations in memory) and then continues to the map
> transformation. What I would expect is emission of window results to the
> map
> transformation as soon as results of the window are ready.
>
> Can anyone explain this behavior?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Windows-emit-
> results-at-the-end-of-the-stream-tp12337.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Cassandra Sink version

2017-03-23 Thread Nancy Estrada
The documentation you mentioned says: "The Java client driver 3.0.7 (branch
3.0.x) is compatible with Apache Cassandra 1.2, 2.0, 2.1, 2.2 and 3.0".

Thank you Kostas!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cassandra-Sink-version-tp12329p12340.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


[ANNOUNCE] Apache Flink 1.1.5 Released

2017-03-23 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is pleased to announce the availability of Flink 
1.1.5, which is the next bugfix release for the 1.1 series.
The official release announcement: 
https://flink.apache.org/news/2017/03/23/release-1.1.5.html
Release binaries: http://apache.lauf-forum.at/flink/flink-1.1.5
For users of the Flink 1.1 series, please update your Maven dependencies to the 
new 1.1.5 version and update your binaries.
On behalf of the community, I would like to thank everybody who contributed to 
the release.