[jira] [Assigned] (FLINK-4928) Implement FLIP-6 YARN Application Master Runner

2016-10-27 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-4928:
---

Assignee: shuai.xu

> Implement FLIP-6 YARN Application Master Runner
> ---
>
> Key: FLINK-4928
> URL: https://issues.apache.org/jira/browse/FLINK-4928
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>Assignee: shuai.xu
>
> The Application Master Runner is the master process started in a YARN 
> container when submitting the Flink-on-YARN job to YARN.
> It has the following data available:
>   - Flink jars
>   - Job jars
>   - JobGraph
>   - Environment variables
>   - Contextual information like security tokens and certificates
> Its responsibility is the following:
>   - Read all configuration and environment variables, computing the effective 
> configuration
>   - Start all shared components (Rpc, HighAvailability Services)
>   - Start the ResourceManager
>   - Start the JobManager Runner



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4927) Implement FLI-6 YARN Resource Manager

2016-10-27 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-4927:
---

Assignee: shuai.xu

> Implement FLI-6 YARN Resource Manager
> -
>
> Key: FLINK-4927
> URL: https://issues.apache.org/jira/browse/FLINK-4927
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>Assignee: shuai.xu
>
> The Flink YARN Resource Manager communicates with YARN's Resource Manager to 
> acquire and release containers.
> It is also responsible to notify the JobManager eagerly about container 
> failures.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4952) Add Scala API for KeyedStream.flatMap

2016-10-27 Thread Manu Zhang (JIRA)
Manu Zhang created FLINK-4952:
-

 Summary: Add Scala API for KeyedStream.flatMap
 Key: FLINK-4952
 URL: https://issues.apache.org/jira/browse/FLINK-4952
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Scala API
Reporter: Manu Zhang






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4951) Better Javadocs for KeyedStream.flatMap

2016-10-27 Thread Manu Zhang (JIRA)
Manu Zhang created FLINK-4951:
-

 Summary: Better Javadocs for KeyedStream.flatMap
 Key: FLINK-4951
 URL: https://issues.apache.org/jira/browse/FLINK-4951
 Project: Flink
  Issue Type: Improvement
Reporter: Manu Zhang
Assignee: Aljoscha Krettek
Priority: Minor


It seems the Javadoc is just copied over from {{DataStream.flatMap}} rather 
than tells what the function is for.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4378) Enable RollingSink to custom HDFS client configuration

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15614076#comment-15614076
 ] 

ASF GitHub Bot commented on FLINK-4378:
---

Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/2356
  
all of the test in travis have passed. thanks for merging this. ^_^


> Enable RollingSink to custom HDFS client configuration
> --
>
> Key: FLINK-4378
> URL: https://issues.apache.org/jira/browse/FLINK-4378
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> Optimizing the configuration of hdfs client in different situation, such as 
> {{io.file.buffer.size}}  can make rolling sink perform better. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2356: [FLINK-4378]Enable RollingSink to custom HDFS client conf...

2016-10-27 Thread wenlong88
Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/2356
  
all of the test in travis have passed. thanks for merging this. ^_^


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15614058#comment-15614058
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zhuhaifengleon commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85463825
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -347,6 +348,12 @@ public Task(
 
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask);
+
+   // add metrics for buffers
+   
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new 
TaskIOMetricGroup.InputBuffersGauge(this));
--- End diff --

Stephan, have you merged this already? I will do that in a patch after 
discussion with Chesnay if you had merged it. 


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2693: [FLINK-4923] [Metrics] Expose input/output buffers...

2016-10-27 Thread zhuhaifengleon
Github user zhuhaifengleon commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85463825
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -347,6 +348,12 @@ public Task(
 
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask);
+
+   // add metrics for buffers
+   
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new 
TaskIOMetricGroup.InputBuffersGauge(this));
--- End diff --

Stephan, have you merged this already? I will do that in a patch after 
discussion with Chesnay if you had merged it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4950) Add support to include multiple Yarn application entries in Yarn properties file

2016-10-27 Thread Vijay Srinivasaraghavan (JIRA)
Vijay Srinivasaraghavan created FLINK-4950:
--

 Summary: Add support to include multiple Yarn application entries 
in Yarn properties file
 Key: FLINK-4950
 URL: https://issues.apache.org/jira/browse/FLINK-4950
 Project: Flink
  Issue Type: Task
  Components: YARN Client
Reporter: Vijay Srinivasaraghavan
Assignee: Vijay Srinivasaraghavan
Priority: Minor


When deploying Flink on Yarn using CLI, Yarn properties file is created in /tmp 
directory and persisted with the application ID along with few other 
properties. 

This JIRA addresses two changes to the current implementation.
1) The properties file should be created in the user home directory so that the 
configurations are not leaked
2) Support multiple application entries in the properties file



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4949) Refactor Gelly driver inputs

2016-10-27 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4949:
-

 Summary: Refactor Gelly driver inputs
 Key: FLINK-4949
 URL: https://issues.apache.org/jira/browse/FLINK-4949
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan


The Gelly drivers started as simple wrappers around library algorithms but have 
grown to handle a matrix of input sources while often running multiple 
algorithms and analytics with custom parameterization.

This ticket will refactor the sourcing of the input graph into separate classes 
for CSV files and RMat which will simplify the inclusion of new data sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2708: [FLINK-4946] [scripts] Load jar files from subdire...

2016-10-27 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2708

[FLINK-4946] [scripts] Load jar files from subdirectories of lib

The Flink classpath is a concatenation of jar files in lib/. This commit 
includes files from subdirectories of lib/ in the classpath.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4946_load_jar_files_from_subdirectories_of_lib

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2708.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2708


commit 19cb4d5bee2863aedf111c1e6e69475c4205a360
Author: Greg Hogan 
Date:   2016-10-27T19:14:36Z

[FLINK-4946] [scripts] Load jar files from subdirectories of lib

The Flink classpath is a concatenation of jar files in lib/. This commit
includes files from subdirectories of lib/ in the classpath.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4946) Load jar files from subdirectories of lib

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15613223#comment-15613223
 ] 

ASF GitHub Bot commented on FLINK-4946:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2708

[FLINK-4946] [scripts] Load jar files from subdirectories of lib

The Flink classpath is a concatenation of jar files in lib/. This commit 
includes files from subdirectories of lib/ in the classpath.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4946_load_jar_files_from_subdirectories_of_lib

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2708.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2708


commit 19cb4d5bee2863aedf111c1e6e69475c4205a360
Author: Greg Hogan 
Date:   2016-10-27T19:14:36Z

[FLINK-4946] [scripts] Load jar files from subdirectories of lib

The Flink classpath is a concatenation of jar files in lib/. This commit
includes files from subdirectories of lib/ in the classpath.




> Load jar files from subdirectories of lib
> -
>
> Key: FLINK-4946
> URL: https://issues.apache.org/jira/browse/FLINK-4946
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Users can more easily track Flink jars with transitive dependencies when 
> copied into subdirectories of {{lib}}. This is the arrangement of {{opt}} for 
> FLINK-4861.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints

2016-10-27 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15613064#comment-15613064
 ] 

Jamie Grier commented on FLINK-4948:


Makes sense.  Maybe a scheme where we can verify that the checkpoint is at 
least self-consistent -- using only data stored in the checkpoint itself.

> Consider using checksums or similar to detect bad checkpoints
> -
>
> Key: FLINK-4948
> URL: https://issues.apache.org/jira/browse/FLINK-4948
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> We should consider proactively checking to verify that checkpoints are valid 
> when reading (and maybe writing).  This should help prevent any possible 
> state corruption issues that might otherwise go undetected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints

2016-10-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612892#comment-15612892
 ] 

Gyula Fora commented on FLINK-4948:
---

I think in general this would be very nice but what about tools that might want 
to alter the checkpoints intentionally for some reasons. Maybe 
cleaning/transforming the state. So I think we should aim for some flexible 
checking logic here.

> Consider using checksums or similar to detect bad checkpoints
> -
>
> Key: FLINK-4948
> URL: https://issues.apache.org/jira/browse/FLINK-4948
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> We should consider proactively checking to verify that checkpoints are valid 
> when reading (and maybe writing).  This should help prevent any possible 
> state corruption issues that might otherwise go undetected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints

2016-10-27 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4948:
--

 Summary: Consider using checksums or similar to detect bad 
checkpoints
 Key: FLINK-4948
 URL: https://issues.apache.org/jira/browse/FLINK-4948
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jamie Grier
 Fix For: 1.2.0


We should consider proactively checking to verify that checkpoints are valid 
when reading (and maybe writing).  This should help prevent any possible state 
corruption issues that might otherwise go undetected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2016-10-27 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-4947:
--

 Summary: Make all configuration possible via flink-conf.yaml and 
CLI.
 Key: FLINK-4947
 URL: https://issues.apache.org/jira/browse/FLINK-4947
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Jamie Grier
 Fix For: 1.2.0


I think it's important to make all configuration possible via the 
flink-conf.yaml and the command line.

As an example:  To enable "externalizedCheckpoints" you must actually call the 
StreamExecutionEnvironment#enableExternalizedCheckpoints() method from your 
Flink program.

Another example of this would be configuring the RocksDB state backend.

I think it important to make deployment flexible and easy to build tools 
around.  For example, the infrastructure teams that make these configuration 
decisions and provide tools for deploying Flink apps, will be different from 
the teams deploying apps.  The team writing apps should not have to set all of 
this lower level configuration up in their programs.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4946) Load jar files from subdirectories of lib

2016-10-27 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4946:
-

 Summary: Load jar files from subdirectories of lib
 Key: FLINK-4946
 URL: https://issues.apache.org/jira/browse/FLINK-4946
 Project: Flink
  Issue Type: Improvement
  Components: Startup Shell Scripts
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


Users can more easily track Flink jars with transitive dependencies when copied 
into subdirectories of {{lib}}. This is the arrangement of {{opt}} for 
FLINK-4861.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-10-27 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@StephanEwen latest commit assembles connectors into separate directories.

I'll create a ticket for loading jar files from subdirectories of`lib`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612777#comment-15612777
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2664
  
@StephanEwen latest commit assembles connectors into separate directories.

I'll create a ticket for loading jar files from subdirectories of`lib`.


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2704: [FLINK-4943] ConfigConstants: YYARN -> YARN

2016-10-27 Thread Makman2
Github user Makman2 commented on the issue:

https://github.com/apache/flink/pull/2704
  
Feel free to cherry-pick my commit into your branch and close this PR :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4943) flink-mesos/ConfigConstants: Typo: YYARN -> YARN

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612697#comment-15612697
 ] 

ASF GitHub Bot commented on FLINK-4943:
---

Github user Makman2 commented on the issue:

https://github.com/apache/flink/pull/2704
  
Feel free to cherry-pick my commit into your branch and close this PR :)


> flink-mesos/ConfigConstants: Typo: YYARN -> YARN
> 
>
> Key: FLINK-4943
> URL: https://issues.apache.org/jira/browse/FLINK-4943
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Mischa Krüger
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4887) Replace ActorGateway by TaskManagerGateway interface

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612673#comment-15612673
 ] 

ASF GitHub Bot commented on FLINK-4887:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2699#discussion_r85386607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+public class FutureUtils {
--- End diff --

There is already a class `FutureUtil` in `flink-core`.
Can you combine this class with that one?


> Replace ActorGateway by TaskManagerGateway interface
> 
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}} 
> and {{ExecutionVertex}} it would be better to decouple the two components by 
> introducing the {{TaskGateway}} interface which provides access to task 
> related rpc calls. The {{ActorGateway}} could be one implementation of the 
> interface.
> This change will prepare the further implementation of Flip-6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4887) Replace ActorGateway by TaskManagerGateway interface

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612675#comment-15612675
 ] 

ASF GitHub Bot commented on FLINK-4887:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2699#discussion_r85383836
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

Leftover change


> Replace ActorGateway by TaskManagerGateway interface
> 
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}} 
> and {{ExecutionVertex}} it would be better to decouple the two components by 
> introducing the {{TaskGateway}} interface which provides access to task 
> related rpc calls. The {{ActorGateway}} could be one implementation of the 
> interface.
> This change will prepare the further implementation of Flip-6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4943) flink-mesos/ConfigConstants: Typo: YYARN -> YARN

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612690#comment-15612690
 ] 

ASF GitHub Bot commented on FLINK-4943:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2704
  
I keep an `_minutiae` branch and push that every few months, typically 
after a release. There are fixed costs at each level of Flink's ticketing / 
pull request / verify / commit process.


> flink-mesos/ConfigConstants: Typo: YYARN -> YARN
> 
>
> Key: FLINK-4943
> URL: https://issues.apache.org/jira/browse/FLINK-4943
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Mischa Krüger
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2704: [FLINK-4943] ConfigConstants: YYARN -> YARN

2016-10-27 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2704
  
I keep an `_minutiae` branch and push that every few months, typically 
after a release. There are fixed costs at each level of Flink's ticketing / 
pull request / verify / commit process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2699: [FLINK-4887] [execution graph] Introduce TaskManag...

2016-10-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2699#discussion_r85392726
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java 
---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.Preconditions;
+
+public class StackTrace {
--- End diff --

This also needs to be serializable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2699: [FLINK-4887] [execution graph] Introduce TaskManag...

2016-10-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2699#discussion_r85390985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Response to the TriggerStackTraceSample message.
+ */
+public class StackTraceSampleResponse {
--- End diff --

This needs to be `java.io.Serializable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4887) Replace ActorGateway by TaskManagerGateway interface

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612672#comment-15612672
 ] 

ASF GitHub Bot commented on FLINK-4887:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2699#discussion_r85392726
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java 
---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.Preconditions;
+
+public class StackTrace {
--- End diff --

This also needs to be serializable


> Replace ActorGateway by TaskManagerGateway interface
> 
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}} 
> and {{ExecutionVertex}} it would be better to decouple the two components by 
> introducing the {{TaskGateway}} interface which provides access to task 
> related rpc calls. The {{ActorGateway}} could be one implementation of the 
> interface.
> This change will prepare the further implementation of Flip-6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2699: [FLINK-4887] [execution graph] Introduce TaskManag...

2016-10-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2699#discussion_r85383836
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

Leftover change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4887) Replace ActorGateway by TaskManagerGateway interface

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612674#comment-15612674
 ] 

ASF GitHub Bot commented on FLINK-4887:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2699#discussion_r85390985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Response to the TriggerStackTraceSample message.
+ */
+public class StackTraceSampleResponse {
--- End diff --

This needs to be `java.io.Serializable`


> Replace ActorGateway by TaskManagerGateway interface
> 
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}} 
> and {{ExecutionVertex}} it would be better to decouple the two components by 
> introducing the {{TaskGateway}} interface which provides access to task 
> related rpc calls. The {{ActorGateway}} could be one implementation of the 
> interface.
> This change will prepare the further implementation of Flip-6.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2704: [FLINK-4943] ConfigConstants: YYARN -> YARN

2016-10-27 Thread Makman2
Github user Makman2 commented on the issue:

https://github.com/apache/flink/pull/2704
  
Though I wouldn't fix more than this one (except I stumble over some typos 
again) :3
Is there already a larger ticket? @greghogan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4943) flink-mesos/ConfigConstants: Typo: YYARN -> YARN

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612618#comment-15612618
 ] 

ASF GitHub Bot commented on FLINK-4943:
---

Github user Makman2 commented on the issue:

https://github.com/apache/flink/pull/2704
  
Though I wouldn't fix more than this one (except I stumble over some typos 
again) :3
Is there already a larger ticket? @greghogan 


> flink-mesos/ConfigConstants: Typo: YYARN -> YARN
> 
>
> Key: FLINK-4943
> URL: https://issues.apache.org/jira/browse/FLINK-4943
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Mischa Krüger
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612488#comment-15612488
 ] 

ASF GitHub Bot commented on FLINK-4939:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2707

[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the 
committing subtask for a pending checkpoint

So far the GenericWriteAheadSink expected that the subtask that wrote a 
temporary buffer to the
state backend upon checkpointing, will be also the one to commit it to the 
third-party storage system.

This commit removes this assumption. To do this it changes the 
CheckpointCommitter to dynamically take the subtaskIdx as a parameter when 
committing something to the third-party storage system ( [void 
commitCheckpoint(int subtaskIdx, long checkpointID)] ) and when asking
if a checkpoint was committed ( [boolean isCheckpointCommitted(int 
subtaskIdx, long checkpointID)] ) and also changes the state kept by the 
[GenericWriteAheadSink] to also
include that subtask index of the subtask that wrote the pending buffer. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink write_ahead_sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2707.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2707


commit 207c5239c9fab6ef09b7bdd410ee83d3d8ca105f
Author: kl0u 
Date:   2016-10-26T15:19:12Z

[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the 
committing subtask for a pending checkpoint

So far the GenericWriteAheadSink expected that
the subtask that wrote a temporary buffer to the
state backend, will be also the one to commit it to
the third-party storage system.

This commit removes this assumption. To do this
it changes the CheckpointCommitter to dynamically
take the subtaskIdx as a parameter when asking
if a checkpoint was committed and also changes the
state kept by the GenericWriteAheadSink to also
include that subtask index of the subtask that wrote
the pending buffer.




> GenericWriteAheadSink: Decouple the creating from the committing subtask for 
> a pending checkpoint
> -
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the 
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this 
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking 
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also 
> include that subtask index of the subtask that wrote 
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2707: [FLINK-4939] GenericWriteAheadSink: Decouple the c...

2016-10-27 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2707

[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the 
committing subtask for a pending checkpoint

So far the GenericWriteAheadSink expected that the subtask that wrote a 
temporary buffer to the
state backend upon checkpointing, will be also the one to commit it to the 
third-party storage system.

This commit removes this assumption. To do this it changes the 
CheckpointCommitter to dynamically take the subtaskIdx as a parameter when 
committing something to the third-party storage system ( [void 
commitCheckpoint(int subtaskIdx, long checkpointID)] ) and when asking
if a checkpoint was committed ( [boolean isCheckpointCommitted(int 
subtaskIdx, long checkpointID)] ) and also changes the state kept by the 
[GenericWriteAheadSink] to also
include that subtask index of the subtask that wrote the pending buffer. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink write_ahead_sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2707.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2707


commit 207c5239c9fab6ef09b7bdd410ee83d3d8ca105f
Author: kl0u 
Date:   2016-10-26T15:19:12Z

[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the 
committing subtask for a pending checkpoint

So far the GenericWriteAheadSink expected that
the subtask that wrote a temporary buffer to the
state backend, will be also the one to commit it to
the third-party storage system.

This commit removes this assumption. To do this
it changes the CheckpointCommitter to dynamically
take the subtaskIdx as a parameter when asking
if a checkpoint was committed and also changes the
state kept by the GenericWriteAheadSink to also
include that subtask index of the subtask that wrote
the pending buffer.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4889) Make InstanceManager independent of ActorRef

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612480#comment-15612480
 ] 

ASF GitHub Bot commented on FLINK-4889:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2698


> Make InstanceManager independent of ActorRef
> 
>
> Key: FLINK-4889
> URL: https://issues.apache.org/jira/browse/FLINK-4889
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{InstanceManager}} strongly depends on {{ActorRef}}, because 
> it uses the actor refs to distinguish task managers. I propose to make the 
> {{InstanceManager}} independent of these {{ActorRefs}} because they are 
> implementation dependent (see Flip-6).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2698: [FLINK-4889] [InstanceManager] Remove ActorRef dep...

2016-10-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2698


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4945) KafkaConsumer logs wrong warning about confirmation for unknown checkpoint

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612477#comment-15612477
 ] 

ASF GitHub Bot commented on FLINK-4945:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2706
  
R @rmetzger 


> KafkaConsumer logs wrong warning about confirmation for unknown checkpoint
> --
>
> Key: FLINK-4945
> URL: https://issues.apache.org/jira/browse/FLINK-4945
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> Checkpoints are currently not registered in all cases. While the code still 
> behaves correctly this leads to misleading warnings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2706: [FLINK-4945] KafkaConsumer logs wrong warning about confi...

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2706
  
R @rmetzger 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint

2016-10-27 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-4939:
--
Summary: GenericWriteAheadSink: Decouple the creating from the committing 
subtask for a pending checkpoint  (was: GenericWriteAheadSink: Decouple the 
subtask creating a pending checkpoint from the one commiting it.)

> GenericWriteAheadSink: Decouple the creating from the committing subtask for 
> a pending checkpoint
> -
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the 
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this 
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking 
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also 
> include that subtask index of the subtask that wrote 
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4945) KafkaConsumer logs wrong warning about confirmation for unknown checkpoint

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612472#comment-15612472
 ] 

ASF GitHub Bot commented on FLINK-4945:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2706

[FLINK-4945] KafkaConsumer logs wrong warning about confirmation for …

This PR fixes an unjustified warning about unknown checkpoints in 
FlinkKafkaConsumerBase.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink fix-kafka-warning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2706






> KafkaConsumer logs wrong warning about confirmation for unknown checkpoint
> --
>
> Key: FLINK-4945
> URL: https://issues.apache.org/jira/browse/FLINK-4945
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> Checkpoints are currently not registered in all cases. While the code still 
> behaves correctly this leads to misleading warnings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2706: [FLINK-4945] KafkaConsumer logs wrong warning abou...

2016-10-27 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2706

[FLINK-4945] KafkaConsumer logs wrong warning about confirmation for …

This PR fixes an unjustified warning about unknown checkpoints in 
FlinkKafkaConsumerBase.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink fix-kafka-warning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2706






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4939) GenericWriteAheadSink: Decouple the subtask creating a pending checkpoint from the one commiting it.

2016-10-27 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-4939:
--
Summary: GenericWriteAheadSink: Decouple the subtask creating a pending 
checkpoint from the one commiting it.  (was: GenericWriteAheadSink: Decouple 
the subtask that created a pending checkpoint from the one that commits it.)

> GenericWriteAheadSink: Decouple the subtask creating a pending checkpoint 
> from the one commiting it.
> 
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the 
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this 
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking 
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also 
> include that subtask index of the subtask that wrote 
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4823) org.apache.flink.types.NullFieldException: Field 0 is null, but expected to hold a value

2016-10-27 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612467#comment-15612467
 ] 

Greg Hogan commented on FLINK-4823:
---

How are you creating your {{Graph}}? Each vertex represented in the edge set 
should be in the vertex set.

Based on this exception we do need to add better error checking with a more 
specific log message.

> org.apache.flink.types.NullFieldException: Field 0 is null, but expected to 
> hold a value
> 
>
> Key: FLINK-4823
> URL: https://issues.apache.org/jira/browse/FLINK-4823
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.0
> Environment: RHEL 6.6
>Reporter: Sajeev Ramakrishnan
>
> Team,
>   We are getting NULL pointer exception while doing the vertex centric graph 
> traversal.
> org.apache.flink.types.NullFieldException: Field 0 is null, but expected to 
> hold a value.
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:126)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
> at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at 
> org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction.join(JoinOperator.java:572)
> at 
> org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver.run(JoinWithSolutionSetFirstDriver.java:196)
> at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at 
> org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
> at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:122)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> ... 13 more
> All the parameters that I am passing for the vertex and edges are not null. 
> Not able to find out the root cause.
> Thanks & Regards,
> Sajeev Ramakrishnan



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4889) Make InstanceManager independent of ActorRef

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612452#comment-15612452
 ] 

ASF GitHub Bot commented on FLINK-4889:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2698
  
Looks good
+1 to merge this


> Make InstanceManager independent of ActorRef
> 
>
> Key: FLINK-4889
> URL: https://issues.apache.org/jira/browse/FLINK-4889
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{InstanceManager}} strongly depends on {{ActorRef}}, because 
> it uses the actor refs to distinguish task managers. I propose to make the 
> {{InstanceManager}} independent of these {{ActorRefs}} because they are 
> implementation dependent (see Flip-6).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2698: [FLINK-4889] [InstanceManager] Remove ActorRef dependency...

2016-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2698
  
Looks good
+1 to merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4945) KafkaConsumer logs wrong warning about confirmation for unknown checkpoint

2016-10-27 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-4945:
-

Assignee: Stefan Richter

> KafkaConsumer logs wrong warning about confirmation for unknown checkpoint
> --
>
> Key: FLINK-4945
> URL: https://issues.apache.org/jira/browse/FLINK-4945
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> Checkpoints are currently not registered in all cases. While the code still 
> behaves correctly this leads to misleading warnings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4945) KafkaConsumer logs wrong warning about confirmation for unknown checkpoint

2016-10-27 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-4945:
-

 Summary: KafkaConsumer logs wrong warning about confirmation for 
unknown checkpoint
 Key: FLINK-4945
 URL: https://issues.apache.org/jira/browse/FLINK-4945
 Project: Flink
  Issue Type: Bug
Reporter: Stefan Richter
Priority: Minor


Checkpoints are currently not registered in all cases. While the code still 
behaves correctly this leads to misleading warnings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612315#comment-15612315
 ] 

ASF GitHub Bot commented on FLINK-2608:
---

Github user chermenin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2623#discussion_r85369265
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 ---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-   
+
+   GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+   pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 
1976
+
+
+   data.add(pwc1);
+   data.add(pwc2);
+
+   return env.fromCollection(data);
+   }
+
+   public static DataSet 
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --

I think it can be left. Just on the safe side.


> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at 
> 

[GitHub] flink pull request #2623: [FLINK-2608] Updated Twitter Chill version.

2016-10-27 Thread chermenin
Github user chermenin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2623#discussion_r85369265
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 ---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-   
+
+   GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+   pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 
1976
+
+
+   data.add(pwc1);
+   data.add(pwc2);
+
+   return env.fromCollection(data);
+   }
+
+   public static DataSet 
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --

I think it can be left. Just on the safe side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612306#comment-15612306
 ] 

ASF GitHub Bot commented on FLINK-2608:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2623
  
I like this approach. One minor question, otherwise +1 to go here


> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> 

[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.

2016-10-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2623
  
I like this approach. One minor question, otherwise +1 to go here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612301#comment-15612301
 ] 

ASF GitHub Bot commented on FLINK-2608:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2623#discussion_r85368023
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 ---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-   
+
+   GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+   pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 
1976
+
+
+   data.add(pwc1);
+   data.add(pwc2);
+
+   return env.fromCollection(data);
+   }
+
+   public static DataSet 
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --

Are these still needed any more by the tests?


> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at 
> 

[GitHub] flink pull request #2623: [FLINK-2608] Updated Twitter Chill version.

2016-10-27 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2623#discussion_r85368023
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
 ---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt = 
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-   
+
+   GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+   pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 
1976
+
+
+   data.add(pwc1);
+   data.add(pwc2);
+
+   return env.fromCollection(data);
+   }
+
+   public static DataSet 
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --

Are these still needed any more by the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4733) Port WebFrontend to new metric system

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612284#comment-15612284
 ] 

ASF GitHub Bot commented on FLINK-4733:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2616
  
I tested the change locally, it works. +1 to merge.


> Port WebFrontend to new metric system
> -
>
> Key: FLINK-4733
> URL: https://issues.apache.org/jira/browse/FLINK-4733
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, TaskManager, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> While the WebFrontend has access to the metric system it still relies on 
> older code in some parts.
> The TaskManager metrics are still gathered using the Codahale library and 
> send with the heartbeats.
> Task related metrics (numRecordsIn etc) are still gathered using 
> accumulators, which are accessed through the execution graph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2616: [FLINK-4733] Port WebInterface to metric system

2016-10-27 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2616
  
I tested the change locally, it works. +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612219#comment-15612219
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85361208
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+   private static final Integer KEY = 1;
+
+   private final Trigger trigger;
+   private final TypeSerializer windowSerializer;
+
+   private final HeapKeyedStateBackend stateBackend;
+   private final TestInternalTimerService internalTimerService;
+
+   public TriggerTestHarness(
+   Trigger trigger,
+   TypeSerializer windowSerializer) throws Exception {
+   this.trigger = trigger;
+   this.windowSerializer = windowSerializer;
+
+   // we only ever use one key, other tests make sure that windows 
work across different
+   // keys
+   DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+   MemoryStateBackend backend = new MemoryStateBackend();
+
+   @SuppressWarnings("unchecked")
+   HeapKeyedStateBackend stateBackend = 
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+   new JobID(),
+   "test_op",
+   IntSerializer.INSTANCE,
+   1,
+   new KeyGroupRange(0, 0),
+   new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
+   this.stateBackend = stateBackend;
+
+   this.stateBackend.setCurrentKey(0);
+
+   this.internalTimerService = new TestInternalTimerService<>(new 
KeyContext() {
+   @Override
+   

[GitHub] flink pull request #2690: [FLINK-4894] [network] Don't request buffer after ...

2016-10-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2690


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612217#comment-15612217
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85360067
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsWindowsTest extends TestLogger {
+
+   @Test
+   public void testWindowAssignment() {
+   WindowAssigner.WindowAssignerContext mockContext =
+   
mock(WindowAssigner.WindowAssignerContext.class);
+
+   EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
--- End diff --

Very minor, but I think a named constant for the value 5000 might make the 
contracts even clearer (e.g. int gap = 5000) and then do (+ gap) to window 
start time to obtain end time.


> Refactor WindowOperator/Trigger Tests
> -
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
>  Issue Type: Improvement
>  Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and 
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these 
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and 
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files, 
> possibly one per trigger, for the triggers. Also, we should extend/change the 
> tests in some key ways:
>  - {{WindowOperatorTest}} test should just verify that the interaction 
> between {{WindowOperator}} and the various other parts works as expected, 
> that the correct methods on {{Trigger}} and {{WindowFunction}} are called at 
> the expected time and that snapshotting, timers, cleanup etc. work correctly. 
> These tests should also verify that the different state types and 
> {{WindowFunctions}} work correctly.
>  - {{Trigger}} tests should present elements to triggers and verify that they 
> fire at the correct times. The actual output of the {{WindowFunction}} is not 
> important for these tests. We should also test that triggers correctly 

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612273#comment-15612273
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2652


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...

2016-10-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2652


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-27 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4715.
--
   Resolution: Fixed
Fix Version/s: 1.1.4

Fixed in cc6655b (release-1.1), 27fd249 (master).

> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4894) Don't block on buffer request after broadcastEvent

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612253#comment-15612253
 ] 

ASF GitHub Bot commented on FLINK-4894:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2690


> Don't block on buffer request after broadcastEvent 
> ---
>
> Key: FLINK-4894
> URL: https://issues.apache.org/jira/browse/FLINK-4894
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> After broadcasting an event (like the checkpoint barrier), the record writer 
> might block on a buffer request although that buffer will only be needed on 
> the next write on that channel.
> Instead of assuming that each serializer has a buffer set, we can change the 
> logic in the writer to request the buffer when it requires one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4894) Don't block on buffer request after broadcastEvent

2016-10-27 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4894.
--
   Resolution: Fixed
Fix Version/s: 1.1.4
   1.2.0

Fixed in 529534f (release-1.1), cbdb784 (master).

> Don't block on buffer request after broadcastEvent 
> ---
>
> Key: FLINK-4894
> URL: https://issues.apache.org/jira/browse/FLINK-4894
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> After broadcasting an event (like the checkpoint barrier), the record writer 
> might block on a buffer request although that buffer will only be needed on 
> the next write on that channel.
> Instead of assuming that each serializer has a buffer set, we can change the 
> logic in the writer to request the buffer when it requires one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-4510) Always create CheckpointCoordinator

2016-10-27 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reopened FLINK-4510:


> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator is only created if a checkpointing interval is 
> configured. This means that no savepoints can be triggered if there is no 
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled 
> periodic checkpoints. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator

2016-10-27 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612226#comment-15612226
 ] 

Ufuk Celebi commented on FLINK-4510:


Fixed in {{b420750}} (release-1.1).

> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator is only created if a checkpointing interval is 
> configured. This means that no savepoints can be triggered if there is no 
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled 
> periodic checkpoints. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4510) Always create CheckpointCoordinator

2016-10-27 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4510.
--
   Resolution: Fixed
Fix Version/s: 1.1.4

> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator is only created if a checkpointing interval is 
> configured. This means that no savepoints can be triggered if there is no 
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled 
> periodic checkpoints. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1561#comment-1561
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85361303
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+   private static final Integer KEY = 1;
+
+   private final Trigger trigger;
+   private final TypeSerializer windowSerializer;
+
+   private final HeapKeyedStateBackend stateBackend;
+   private final TestInternalTimerService internalTimerService;
+
+   public TriggerTestHarness(
+   Trigger trigger,
+   TypeSerializer windowSerializer) throws Exception {
+   this.trigger = trigger;
+   this.windowSerializer = windowSerializer;
+
+   // we only ever use one key, other tests make sure that windows 
work across different
+   // keys
+   DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+   MemoryStateBackend backend = new MemoryStateBackend();
+
+   @SuppressWarnings("unchecked")
+   HeapKeyedStateBackend stateBackend = 
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+   new JobID(),
+   "test_op",
+   IntSerializer.INSTANCE,
+   1,
+   new KeyGroupRange(0, 0),
+   new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
+   this.stateBackend = stateBackend;
+
+   this.stateBackend.setCurrentKey(0);
+
+   this.internalTimerService = new TestInternalTimerService<>(new 
KeyContext() {
+   @Override
+   

[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612221#comment-15612221
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85360454
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+   /**
+* Verify that state of separate windows does not leak into other 
windows.
+*/
+   @Test
+   public void testWindowSeparationAndFiring() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+   // inject several elements
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   assertEquals(0, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(2, testHarness.numEventTimeTimers());
+   assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+   assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+   assertEquals(TriggerResult.FIRE, 
testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+   assertEquals(0, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(1, testHarness.numEventTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+   assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+   assertEquals(TriggerResult.FIRE, 
testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
+
--- End diff --

We could also check that multiple trigger fire when a watermark surpassed 
them all at once (and maybe not all if we use 3 trigger for this), to catch 
corner cases.



> Refactor WindowOperator/Trigger Tests
> -
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
>  Issue Type: Improvement
>  Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and 
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these 
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and 
> {{WindowFunction}} produce the 

[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612215#comment-15612215
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85359406
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+   /**
+* Verify that state of separate windows does not leak into other 
windows.
+*/
+   @Test
+   public void testWindowSeparationAndFiring() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(CountTrigger.of(3), new 
TimeWindow.Serializer());
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // shouldn't have any timers
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers());
+
+   assertEquals(2, testHarness.numStateEntries());
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // right now, CountTrigger will clear it's state in onElement 
when firing
+   // ideally, this should be moved to onFire()
+   assertEquals(1, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+   }
--- End diff --

You could also let W(2,4) fire for completeness, so that the count was not 
reset by the previous firing.


> Refactor WindowOperator/Trigger Tests
> -
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
>  Issue Type: Improvement
>  Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and 
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these 
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and 
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files, 
> possibly one per trigger, for the triggers. Also, we should extend/change the 
> tests in some key ways:
>  - {{WindowOperatorTest}} test should just verify that the interaction 
> between {{WindowOperator}} and the various other parts works 

[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85360067
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsWindowsTest extends TestLogger {
+
+   @Test
+   public void testWindowAssignment() {
+   WindowAssigner.WindowAssignerContext mockContext =
+   
mock(WindowAssigner.WindowAssignerContext.class);
+
+   EventTimeSessionWindows assigner = 
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
--- End diff --

Very minor, but I think a named constant for the value 5000 might make the 
contracts even clearer (e.g. int gap = 5000) and then do (+ gap) to window 
start time to obtain end time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85361303
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+   private static final Integer KEY = 1;
+
+   private final Trigger trigger;
+   private final TypeSerializer windowSerializer;
+
+   private final HeapKeyedStateBackend stateBackend;
+   private final TestInternalTimerService internalTimerService;
+
+   public TriggerTestHarness(
+   Trigger trigger,
+   TypeSerializer windowSerializer) throws Exception {
+   this.trigger = trigger;
+   this.windowSerializer = windowSerializer;
+
+   // we only ever use one key, other tests make sure that windows 
work across different
+   // keys
+   DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+   MemoryStateBackend backend = new MemoryStateBackend();
+
+   @SuppressWarnings("unchecked")
+   HeapKeyedStateBackend stateBackend = 
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+   new JobID(),
+   "test_op",
+   IntSerializer.INSTANCE,
+   1,
+   new KeyGroupRange(0, 0),
+   new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
+   this.stateBackend = stateBackend;
+
+   this.stateBackend.setCurrentKey(0);
+
+   this.internalTimerService = new TestInternalTimerService<>(new 
KeyContext() {
+   @Override
+   public void setCurrentKey(Object key) {
+   // ignore
+   }
+
+   @Override
+   public Object getCurrentKey() {
+   

[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85360454
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+   /**
+* Verify that state of separate windows does not leak into other 
windows.
+*/
+   @Test
+   public void testWindowSeparationAndFiring() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer());
+
+   // inject several elements
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   assertEquals(0, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(2, testHarness.numEventTimeTimers());
+   assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+   assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+   assertEquals(TriggerResult.FIRE, 
testHarness.advanceWatermark(2, new TimeWindow(0, 2)));
+
+   assertEquals(0, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(1, testHarness.numEventTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers(new 
TimeWindow(0, 2)));
+   assertEquals(1, testHarness.numEventTimeTimers(new 
TimeWindow(2, 4)));
+
+   assertEquals(TriggerResult.FIRE, 
testHarness.advanceWatermark(4, new TimeWindow(2, 4)));
+
--- End diff --

We could also check that multiple trigger fire when a watermark surpassed 
them all at once (and maybe not all if we use 3 trigger for this), to catch 
corner cases.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612218#comment-15612218
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85359630
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+   /**
+* Verify that state of separate windows does not leak into other 
windows.
+*/
+   @Test
+   public void testWindowSeparationAndFiring() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(CountTrigger.of(3), new 
TimeWindow.Serializer());
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // shouldn't have any timers
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers());
+
+   assertEquals(2, testHarness.numStateEntries());
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // right now, CountTrigger will clear it's state in onElement 
when firing
+   // ideally, this should be moved to onFire()
+   assertEquals(1, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+   }
+
+   /**
+* Verify that clear() does not leak across windows.
+*/
+   @Test
+   public void testClear() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(CountTrigger.of(3), new 
TimeWindow.Serializer());
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // shouldn't have any timers
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers());
+
+   assertEquals(2, testHarness.numStateEntries());
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+   testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+   

[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85359630
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+   /**
+* Verify that state of separate windows does not leak into other 
windows.
+*/
+   @Test
+   public void testWindowSeparationAndFiring() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(CountTrigger.of(3), new 
TimeWindow.Serializer());
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // shouldn't have any timers
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers());
+
+   assertEquals(2, testHarness.numStateEntries());
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // right now, CountTrigger will clear it's state in onElement 
when firing
+   // ideally, this should be moved to onFire()
+   assertEquals(1, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+   }
+
+   /**
+* Verify that clear() does not leak across windows.
+*/
+   @Test
+   public void testClear() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(CountTrigger.of(3), new 
TimeWindow.Serializer());
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // shouldn't have any timers
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers());
+
+   assertEquals(2, testHarness.numStateEntries());
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+   testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+   assertEquals(1, testHarness.numStateEntries());
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+   

[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85361208
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+   private static final Integer KEY = 1;
+
+   private final Trigger trigger;
+   private final TypeSerializer windowSerializer;
+
+   private final HeapKeyedStateBackend stateBackend;
+   private final TestInternalTimerService internalTimerService;
+
+   public TriggerTestHarness(
+   Trigger trigger,
+   TypeSerializer windowSerializer) throws Exception {
+   this.trigger = trigger;
+   this.windowSerializer = windowSerializer;
+
+   // we only ever use one key, other tests make sure that windows 
work across different
+   // keys
+   DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+   MemoryStateBackend backend = new MemoryStateBackend();
+
+   @SuppressWarnings("unchecked")
+   HeapKeyedStateBackend stateBackend = 
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+   new JobID(),
+   "test_op",
+   IntSerializer.INSTANCE,
+   1,
+   new KeyGroupRange(0, 0),
+   new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
+   this.stateBackend = stateBackend;
+
+   this.stateBackend.setCurrentKey(0);
+
+   this.internalTimerService = new TestInternalTimerService<>(new 
KeyContext() {
+   @Override
+   public void setCurrentKey(Object key) {
+   // ignore
+   }
+
+   @Override
+   public Object getCurrentKey() {
+   

[jira] [Commented] (FLINK-4552) Refactor WindowOperator/Trigger Tests

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612216#comment-15612216
 ] 

ASF GitHub Bot commented on FLINK-4552:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85360968
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+   /**
+* Check if {@link PurgingTrigger} implements all methods of {@link 
Trigger}, as a sanity
+* check.
+*/
+   @Test
+   public void testAllMethodsImplemented() throws NoSuchMethodException {
--- End diff --

What is the purpose of this and how is it different from checking that 
PurgingTrigger is not abstract?


> Refactor WindowOperator/Trigger Tests
> -
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
>  Issue Type: Improvement
>  Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and 
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these 
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and 
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files, 
> possibly one per trigger, for the triggers. Also, we should extend/change the 
> tests in some key ways:
>  - {{WindowOperatorTest}} test should just verify that the interaction 
> between {{WindowOperator}} and the various other parts works as expected, 
> that the correct methods on {{Trigger}} and {{WindowFunction}} are called at 
> the expected time and that snapshotting, timers, cleanup etc. work correctly. 
> These tests should also verify that the different state types and 
> {{WindowFunctions}} work correctly.
>  - {{Trigger}} tests should present elements to triggers and verify that they 
> fire at the correct times. The actual output of the {{WindowFunction}} is not 
> important for these tests. We should also test that triggers correctly clean 
> up state and timers.
>  - {{WindowAssigner}} tests should test each window assigner and also verify 
> that, for example, the offset parameter of time-based windows works correctly.
> There is already {{WindowingTestHarness}} but it is not used by tests, I 
> think we can expand on that and provide more thorough test coverage while 
> also making the tests more maintainable ({{WindowOperatorTest.java}} is 
> 

[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85359406
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+   /**
+* Verify that state of separate windows does not leak into other 
windows.
+*/
+   @Test
+   public void testWindowSeparationAndFiring() throws Exception {
+   TriggerTestHarness testHarness =
+   new 
TriggerTestHarness<>(CountTrigger.of(3), new 
TimeWindow.Serializer());
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // shouldn't have any timers
+   assertEquals(0, testHarness.numProcessingTimeTimers());
+   assertEquals(0, testHarness.numEventTimeTimers());
+
+   assertEquals(2, testHarness.numStateEntries());
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.FIRE, testHarness.processElement(new 
StreamRecord(1), new TimeWindow(0, 2)));
+   assertEquals(TriggerResult.CONTINUE, 
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+   // right now, CountTrigger will clear it's state in onElement 
when firing
+   // ideally, this should be moved to onFire()
+   assertEquals(1, testHarness.numStateEntries());
+   assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 
2)));
+   assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 
4)));
+   }
--- End diff --

You could also let W(2,4) fire for completeness, so that the count was not 
reset by the previous firing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2572: [FLINK-4552] Refactor WindowOperator/Trigger Tests

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2572#discussion_r85360968
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
+import static 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+   /**
+* Check if {@link PurgingTrigger} implements all methods of {@link 
Trigger}, as a sanity
+* check.
+*/
+   @Test
+   public void testAllMethodsImplemented() throws NoSuchMethodException {
--- End diff --

What is the purpose of this and how is it different from checking that 
PurgingTrigger is not abstract?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4733) Port WebFrontend to new metric system

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612172#comment-15612172
 ] 

ASF GitHub Bot commented on FLINK-4733:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2616
  
Thank you for rebasing.
This run: 
had the following error: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/170428661/log.txt
```
Failed tests: 
  CoordinatorShutdownTest.testCoordinatorShutsDownOnFailure:94 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph cannot be cast 
to org.apache.flink.runtime.executiongraph.ExecutionGraph
```
I suspect we need to change the cast there to `AccessExecutionGraph`.


> Port WebFrontend to new metric system
> -
>
> Key: FLINK-4733
> URL: https://issues.apache.org/jira/browse/FLINK-4733
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, TaskManager, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> While the WebFrontend has access to the metric system it still relies on 
> older code in some parts.
> The TaskManager metrics are still gathered using the Codahale library and 
> send with the heartbeats.
> Task related metrics (numRecordsIn etc) are still gathered using 
> accumulators, which are accessed through the execution graph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2616: [FLINK-4733] Port WebInterface to metric system

2016-10-27 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2616
  
Thank you for rebasing.
This run: 
had the following error: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/170428661/log.txt
```
Failed tests: 
  CoordinatorShutdownTest.testCoordinatorShutsDownOnFailure:94 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph cannot be cast 
to org.apache.flink.runtime.executiongraph.ExecutionGraph
```
I suspect we need to change the cast there to `AccessExecutionGraph`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4756) NullPointerException on submiting a job with StreamExecutionEnvironment.createRemoteEnvironment to a flink cluster

2016-10-27 Thread Ruwen Moos (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruwen Moos resolved FLINK-4756.
---
Resolution: Not A Bug

Version of flink for the cluster and in the maven project were to far apart.

If you have this error check the flink version used for building and of the 
cluster.

> NullPointerException on submiting a job with 
> StreamExecutionEnvironment.createRemoteEnvironment to a flink cluster
> --
>
> Key: FLINK-4756
> URL: https://issues.apache.org/jira/browse/FLINK-4756
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.1.2
> Environment: cluster@centOS7 in a docker container
>Reporter: Ruwen Moos
>
> Flink cluster throws the following exception when I try to send my job to my 
> Flink cluster with StreamExecutionEnvironment.createRemoteEnvironment.
> 2016-10-04 07:29:46,106 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Failed to submit job 08f41104b8b523380fbc8f0d7d1da6f1 (Flink 
> Java Job at Tue Oct 04 09:29:37 CEST 2016)
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:478)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> The Flink programm runs as a standalone flink programm with 
> StreamExecutionEnvironment.getExecutionEnvironment() without any issues. With 
> getExecutionEnvironment() uploading via the web gui works when running it on 
> the cluster, just not via a RemoteStreamEnvironment
> Same exception also happens when using a local cluster on windows.
> I also tried running the wordcount example from flink-examples on the 
> cluster, same result.
> Jars for the job are created with the maven-assembly-plugin.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2694: [FLINK-4925] [metrics] Integrate meters into IOMetricGrou...

2016-10-27 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2694
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4925) Integrate meters into IOMetricGroups

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612162#comment-15612162
 ] 

ASF GitHub Bot commented on FLINK-4925:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2694
  
+1 to merge.


> Integrate meters into IOMetricGroups
> 
>
> Key: FLINK-4925
> URL: https://issues.apache.org/jira/browse/FLINK-4925
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2597) Add a test for Avro-serialized Kafka messages

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612146#comment-15612146
 ] 

ASF GitHub Bot commented on FLINK-2597:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2705

[FLINK-2597][FLINK-4050] Add wrappers for Kafka serializers, test for 
partitioner and documentation

This pull requests addresses the following JIRAs:
- [FLINK-2597
Add a test for Avro-serialized Kafka 
messages](https://issues.apache.org/jira/browse/FLINK-2597)
- [FLINK-4050 FlinkKafkaProducer API 
Refactor](https://issues.apache.org/jira/browse/FLINK-4050)


The PR adds:
- `KafkaSerializerWrapper` and `KafkaDeserializerWrapper` wrappers for 
using Kafka serializers with Flink
- A test case involving Confluent's `KafkaAvroSerializer` and 
`KafkaAvroDeserializer`. They also use the schema registry from confluent 
(which I'm mocking with a simple test http server in the test).
- A test validating that we are properly calling Kafka partitioners with 
the producer
- Documentation for everything mentioned above.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink2597

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2705.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2705


commit 76365abb8ae378eaee3809ec050a09e266d745a2
Author: Robert Metzger 
Date:   2016-10-26T18:48:21Z

[FLINK-2597] Add wrappers for Kafka serializers, tests for Kafka 
partitioner and documentation




> Add a test for Avro-serialized Kafka messages 
> --
>
> Key: FLINK-2597
> URL: https://issues.apache.org/jira/browse/FLINK-2597
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Kafka 
> Connector
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Vimal
>Priority: Minor
>
> A user has asked for serializing Avro messages from Kafka.
> I think its a legitimate use-case that we should cover by a test case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2705: [FLINK-2597][FLINK-4050] Add wrappers for Kafka se...

2016-10-27 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2705

[FLINK-2597][FLINK-4050] Add wrappers for Kafka serializers, test for 
partitioner and documentation

This pull requests addresses the following JIRAs:
- [FLINK-2597
Add a test for Avro-serialized Kafka 
messages](https://issues.apache.org/jira/browse/FLINK-2597)
- [FLINK-4050 FlinkKafkaProducer API 
Refactor](https://issues.apache.org/jira/browse/FLINK-4050)


The PR adds:
- `KafkaSerializerWrapper` and `KafkaDeserializerWrapper` wrappers for 
using Kafka serializers with Flink
- A test case involving Confluent's `KafkaAvroSerializer` and 
`KafkaAvroDeserializer`. They also use the schema registry from confluent 
(which I'm mocking with a simple test http server in the test).
- A test validating that we are properly calling Kafka partitioners with 
the producer
- Documentation for everything mentioned above.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink2597

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2705.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2705


commit 76365abb8ae378eaee3809ec050a09e266d745a2
Author: Robert Metzger 
Date:   2016-10-26T18:48:21Z

[FLINK-2597] Add wrappers for Kafka serializers, tests for Kafka 
partitioner and documentation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612123#comment-15612123
 ] 

ASF GitHub Bot commented on FLINK-2608:
---

Github user chermenin commented on the issue:

https://github.com/apache/flink/pull/2623
  
I exclude Kryo as a dependency from Chill and add simple test to check Java 
collections. Tests added to `flink-tests` because it is depends on 
`flink-runtime` (where used Chill).


> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 

[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.

2016-10-27 Thread chermenin
Github user chermenin commented on the issue:

https://github.com/apache/flink/pull/2623
  
I exclude Kryo as a dependency from Chill and add simple test to check Java 
collections. Tests added to `flink-tests` because it is depends on 
`flink-runtime` (where used Chill).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2696: [FLINK-3347] [akka] Add QuarantineMonitor which shuts a q...

2016-10-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2696
  
Thanks for the review @StephanEwen. Yes let's do it as you've proposed. 
I've opened an [issue](https://issues.apache.org/jira/browse/FLINK-4944) for 
replacing Akka's death watch with our own heartbeat on the `TaskManager` side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-10-27 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4037.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
> Fix For: 1.2.0
>
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2697: [backport] [FLINK-3347] [akka] Add QuarantineMonit...

2016-10-27 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2697


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3347) TaskManager (or its ActorSystem) need to restart in case they notice quarantine

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612084#comment-15612084
 ] 

ASF GitHub Bot commented on FLINK-3347:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2696
  
Thanks for the review @StephanEwen. Yes let's do it as you've proposed. 
I've opened an [issue](https://issues.apache.org/jira/browse/FLINK-4944) for 
replacing Akka's death watch with our own heartbeat on the `TaskManager` side.


> TaskManager (or its ActorSystem) need to restart in case they notice 
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.2.0, 1.1.4
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no 
> further communication is possible with that actor system unless one of the 
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at 
> the JobManager (Akka refuses connection because of the quarantined state), 
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either 
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor 
> system event stream: 
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4944) Replace Akka's deatch watch with own heartbeat on the TM side

2016-10-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4944:


 Summary: Replace Akka's deatch watch with own heartbeat on the TM 
side
 Key: FLINK-4944
 URL: https://issues.apache.org/jira/browse/FLINK-4944
 Project: Flink
  Issue Type: Improvement
  Components: TaskManager
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.2.0


In order to properly implement FLINK-3347, the {{TaskManager}} must no longer 
use Akka's death watch mechanism to detect {{JobManager}} failures. The reason 
is that a hard {{JobManager}} failure will lead to quarantining the 
{{JobManager's}} {{ActorSystem}} by the {{TaskManagers}}. This in combination 
with FLINK-3347 would lead to a shutdown of all {{TaskManagers}}.

Instead we should use our own heartbeat signal to detect dead {{JobManagers}}. 
In case of a heartbeat timeout, the {{TaskManager}} won't shut down but simply 
cancel and clear everything. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3347) TaskManager (or its ActorSystem) need to restart in case they notice quarantine

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612066#comment-15612066
 ] 

ASF GitHub Bot commented on FLINK-3347:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2697


> TaskManager (or its ActorSystem) need to restart in case they notice 
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.2.0, 1.1.4
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no 
> further communication is possible with that actor system unless one of the 
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at 
> the JobManager (Akka refuses connection because of the quarantined state), 
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either 
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor 
> system event stream: 
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2691: [FLINK-4910] Introduce safety net for closing file system...

2016-10-27 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2691
  
Thanks for the reviews @StephanEwen and @aljoscha ! I addressed your 
comments in my last commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4910) Introduce safety net for closing file system streams

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611984#comment-15611984
 ] 

ASF GitHub Bot commented on FLINK-4910:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2691
  
Thanks for the reviews @StephanEwen and @aljoscha ! I addressed your 
comments in my last commit.


> Introduce safety net for closing file system streams
> 
>
> Key: FLINK-4910
> URL: https://issues.apache.org/jira/browse/FLINK-4910
> Project: Flink
>  Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Streams that are opened through {{FileSystem}} must be closed at the end of 
> their life cycle. However, we found hints that some code forgets to close 
> such streams.
> We should introduce i) a mechanism that closes leaking unclosed streams after 
> usage and ii) provides logging that helps us to track down and fi the sources 
> of such leaks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4876) Allow web interface to be bound to a specific ip/interface/inetHost

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611981#comment-15611981
 ] 

ASF GitHub Bot commented on FLINK-4876:
---

Github user attachmentgenie commented on the issue:

https://github.com/apache/flink/pull/2680
  
@StephanEwen seems reasonable thing to do, i updated the code to use the 
new ConfigOption method


> Allow web interface to be bound to a specific ip/interface/inetHost
> ---
>
> Key: FLINK-4876
> URL: https://issues.apache.org/jira/browse/FLINK-4876
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.2, 1.1.3
>Reporter: Bram Vogelaar
>Assignee: Bram Vogelaar
>Priority: Minor
>
> Currently the web interface automatically binds to all interfaces on 0.0.0.0. 
> IMHO there are some use cases to only bind to a specific ipadress, (e.g. 
> access through an authenticated proxy, not binding on the management or 
> backup interface)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2680: [FLINK-4876] Allow web interface to be bound to a specifi...

2016-10-27 Thread attachmentgenie
Github user attachmentgenie commented on the issue:

https://github.com/apache/flink/pull/2680
  
@StephanEwen seems reasonable thing to do, i updated the code to use the 
new ConfigOption method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4943) flink-mesos/ConfigConstants: Typo: YYARN -> YARN

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611940#comment-15611940
 ] 

ASF GitHub Bot commented on FLINK-4943:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2704
  
I'd recommend collecting typographical and grammatical errors within 
comments into a larger ticket and pull request.


> flink-mesos/ConfigConstants: Typo: YYARN -> YARN
> 
>
> Key: FLINK-4943
> URL: https://issues.apache.org/jira/browse/FLINK-4943
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Mischa Krüger
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3347) TaskManager (or its ActorSystem) need to restart in case they notice quarantine

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611963#comment-15611963
 ] 

ASF GitHub Bot commented on FLINK-3347:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2697
  
Thanks for the review @StephanEwen. I will change the configuration name 
and then merge the PR into the release-1.1 branch.

Travis passed locally and the failing test cases are unrelated.


> TaskManager (or its ActorSystem) need to restart in case they notice 
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.2.0, 1.1.4
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no 
> further communication is possible with that actor system unless one of the 
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at 
> the JobManager (Akka refuses connection because of the quarantined state), 
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either 
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor 
> system event stream: 
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2697: [backport] [FLINK-3347] [akka] Add QuarantineMonitor whic...

2016-10-27 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2697
  
Thanks for the review @StephanEwen. I will change the configuration name 
and then merge the PR into the release-1.1 branch.

Travis passed locally and the failing test cases are unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >