[
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
shuai.xu reassigned FLINK-4928:
---
Assignee: shuai.xu
> Implement FLIP-6 YARN Application Master Runner
> ---
>
> Key: FLINK-4928
> URL: https://issues.apache.org/jira/browse/FLINK-4928
> Project: Flink
> Issue Type: Sub-task
> Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>Assignee: shuai.xu
>
> The Application Master Runner is the master process started in a YARN
> container when submitting the Flink-on-YARN job to YARN.
> It has the following data available:
> - Flink jars
> - Job jars
> - JobGraph
> - Environment variables
> - Contextual information like security tokens and certificates
> Its responsibility is the following:
> - Read all configuration and environment variables, computing the effective
> configuration
> - Start all shared components (Rpc, HighAvailability Services)
> - Start the ResourceManager
> - Start the JobManager Runner
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Manu Zhang created FLINK-4952:
-
Summary: Add Scala API for KeyedStream.flatMap
Key: FLINK-4952
URL: https://issues.apache.org/jira/browse/FLINK-4952
Project: Flink
Issue Type: Improvement
Components: DataStream API, Scala API
Reporter: Manu Zhang
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Manu Zhang created FLINK-4951:
-
Summary: Better Javadocs for KeyedStream.flatMap
Key: FLINK-4951
URL: https://issues.apache.org/jira/browse/FLINK-4951
Project: Flink
Issue Type: Improvement
Reporter: Manu Zhang
Assignee: Aljoscha Krettek
Priority: Minor
It seems the Javadoc is just copied over from {{DataStream.flatMap}} rather
than tells what the function is for.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15614076#comment-15614076
]
ASF GitHub Bot commented on FLINK-4378:
---
Github user wenlong88 commented on the issue:
https://github.com/apache/flink/pull/2356
all of the test in travis have passed. thanks for merging this. ^_^
> Enable RollingSink to custom HDFS client configuration
> --
>
> Key: FLINK-4378
> URL: https://issues.apache.org/jira/browse/FLINK-4378
> Project: Flink
> Issue Type: Improvement
> Components: filesystem-connector
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> Optimizing the configuration of hdfs client in different situation, such as
> {{io.file.buffer.size}} can make rolling sink perform better.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user wenlong88 commented on the issue:
https://github.com/apache/flink/pull/2356
all of the test in travis have passed. thanks for merging this. ^_^
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15614058#comment-15614058
]
ASF GitHub Bot commented on FLINK-4923:
---
Github user zhuhaifengleon commented on a diff in the pull request:
https://github.com/apache/flink/pull/2693#discussion_r85463825
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -347,6 +348,12 @@ public Task(
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this,
taskNameWithSubtask);
+
+ // add metrics for buffers
+
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new
TaskIOMetricGroup.InputBuffersGauge(this));
--- End diff --
Stephan, have you merged this already? I will do that in a patch after
discussion with Chesnay if you had merged it.
> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
> 1. Buffers.inputQueueLength: received buffers of InputGates for a task
> 2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
> 3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
> 4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for
> a task
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user zhuhaifengleon commented on a diff in the pull request:
https://github.com/apache/flink/pull/2693#discussion_r85463825
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -347,6 +348,12 @@ public Task(
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this,
taskNameWithSubtask);
+
+ // add metrics for buffers
+
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new
TaskIOMetricGroup.InputBuffersGauge(this));
--- End diff --
Stephan, have you merged this already? I will do that in a patch after
discussion with Chesnay if you had merged it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Vijay Srinivasaraghavan created FLINK-4950:
--
Summary: Add support to include multiple Yarn application entries
in Yarn properties file
Key: FLINK-4950
URL: https://issues.apache.org/jira/browse/FLINK-4950
Project: Flink
Issue Type: Task
Components: YARN Client
Reporter: Vijay Srinivasaraghavan
Assignee: Vijay Srinivasaraghavan
Priority: Minor
When deploying Flink on Yarn using CLI, Yarn properties file is created in /tmp
directory and persisted with the application ID along with few other
properties.
This JIRA addresses two changes to the current implementation.
1) The properties file should be created in the user home directory so that the
configurations are not leaked
2) Support multiple application entries in the properties file
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Greg Hogan created FLINK-4949:
-
Summary: Refactor Gelly driver inputs
Key: FLINK-4949
URL: https://issues.apache.org/jira/browse/FLINK-4949
Project: Flink
Issue Type: Improvement
Components: Gelly
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
The Gelly drivers started as simple wrappers around library algorithms but have
grown to handle a matrix of input sources while often running multiple
algorithms and analytics with custom parameterization.
This ticket will refactor the sourcing of the input graph into separate classes
for CSV files and RMat which will simplify the inclusion of new data sources.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
GitHub user greghogan opened a pull request:
https://github.com/apache/flink/pull/2708
[FLINK-4946] [scripts] Load jar files from subdirectories of lib
The Flink classpath is a concatenation of jar files in lib/. This commit
includes files from subdirectories of lib/ in the classpath.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/greghogan/flink
4946_load_jar_files_from_subdirectories_of_lib
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2708.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2708
commit 19cb4d5bee2863aedf111c1e6e69475c4205a360
Author: Greg Hogan
Date: 2016-10-27T19:14:36Z
[FLINK-4946] [scripts] Load jar files from subdirectories of lib
The Flink classpath is a concatenation of jar files in lib/. This commit
includes files from subdirectories of lib/ in the classpath.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15613223#comment-15613223
]
ASF GitHub Bot commented on FLINK-4946:
---
GitHub user greghogan opened a pull request:
https://github.com/apache/flink/pull/2708
[FLINK-4946] [scripts] Load jar files from subdirectories of lib
The Flink classpath is a concatenation of jar files in lib/. This commit
includes files from subdirectories of lib/ in the classpath.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/greghogan/flink
4946_load_jar_files_from_subdirectories_of_lib
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2708.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2708
commit 19cb4d5bee2863aedf111c1e6e69475c4205a360
Author: Greg Hogan
Date: 2016-10-27T19:14:36Z
[FLINK-4946] [scripts] Load jar files from subdirectories of lib
The Flink classpath is a concatenation of jar files in lib/. This commit
includes files from subdirectories of lib/ in the classpath.
> Load jar files from subdirectories of lib
> -
>
> Key: FLINK-4946
> URL: https://issues.apache.org/jira/browse/FLINK-4946
> Project: Flink
> Issue Type: Improvement
> Components: Startup Shell Scripts
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Users can more easily track Flink jars with transitive dependencies when
> copied into subdirectories of {{lib}}. This is the arrangement of {{opt}} for
> FLINK-4861.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15613064#comment-15613064
]
Jamie Grier commented on FLINK-4948:
Makes sense. Maybe a scheme where we can verify that the checkpoint is at
least self-consistent -- using only data stored in the checkpoint itself.
> Consider using checksums or similar to detect bad checkpoints
> -
>
> Key: FLINK-4948
> URL: https://issues.apache.org/jira/browse/FLINK-4948
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> We should consider proactively checking to verify that checkpoints are valid
> when reading (and maybe writing). This should help prevent any possible
> state corruption issues that might otherwise go undetected.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612892#comment-15612892
]
Gyula Fora commented on FLINK-4948:
---
I think in general this would be very nice but what about tools that might want
to alter the checkpoints intentionally for some reasons. Maybe
cleaning/transforming the state. So I think we should aim for some flexible
checking logic here.
> Consider using checksums or similar to detect bad checkpoints
> -
>
> Key: FLINK-4948
> URL: https://issues.apache.org/jira/browse/FLINK-4948
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> We should consider proactively checking to verify that checkpoints are valid
> when reading (and maybe writing). This should help prevent any possible
> state corruption issues that might otherwise go undetected.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Jamie Grier created FLINK-4948:
--
Summary: Consider using checksums or similar to detect bad
checkpoints
Key: FLINK-4948
URL: https://issues.apache.org/jira/browse/FLINK-4948
Project: Flink
Issue Type: Improvement
Components: DataStream API
Reporter: Jamie Grier
Fix For: 1.2.0
We should consider proactively checking to verify that checkpoints are valid
when reading (and maybe writing). This should help prevent any possible state
corruption issues that might otherwise go undetected.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Jamie Grier created FLINK-4947:
--
Summary: Make all configuration possible via flink-conf.yaml and
CLI.
Key: FLINK-4947
URL: https://issues.apache.org/jira/browse/FLINK-4947
Project: Flink
Issue Type: Improvement
Components: DataStream API
Reporter: Jamie Grier
Fix For: 1.2.0
I think it's important to make all configuration possible via the
flink-conf.yaml and the command line.
As an example: To enable "externalizedCheckpoints" you must actually call the
StreamExecutionEnvironment#enableExternalizedCheckpoints() method from your
Flink program.
Another example of this would be configuring the RocksDB state backend.
I think it important to make deployment flexible and easy to build tools
around. For example, the infrastructure teams that make these configuration
decisions and provide tools for deploying Flink apps, will be different from
the teams deploying apps. The team writing apps should not have to set all of
this lower level configuration up in their programs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Greg Hogan created FLINK-4946:
-
Summary: Load jar files from subdirectories of lib
Key: FLINK-4946
URL: https://issues.apache.org/jira/browse/FLINK-4946
Project: Flink
Issue Type: Improvement
Components: Startup Shell Scripts
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor
Users can more easily track Flink jars with transitive dependencies when copied
into subdirectories of {{lib}}. This is the arrangement of {{opt}} for
FLINK-4861.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user greghogan commented on the issue:
https://github.com/apache/flink/pull/2664
@StephanEwen latest commit assembles connectors into separate directories.
I'll create a ticket for loading jar files from subdirectories of`lib`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612777#comment-15612777
]
ASF GitHub Bot commented on FLINK-4861:
---
Github user greghogan commented on the issue:
https://github.com/apache/flink/pull/2664
@StephanEwen latest commit assembles connectors into separate directories.
I'll create a ticket for loading jar files from subdirectories of`lib`.
> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
> Issue Type: New Feature
> Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
> package the Flink libraries and connectors into subdirectories of a new
> {{opt}} directory in the release/snapshot tarballs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user Makman2 commented on the issue:
https://github.com/apache/flink/pull/2704
Feel free to cherry-pick my commit into your branch and close this PR :)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612697#comment-15612697
]
ASF GitHub Bot commented on FLINK-4943:
---
Github user Makman2 commented on the issue:
https://github.com/apache/flink/pull/2704
Feel free to cherry-pick my commit into your branch and close this PR :)
> flink-mesos/ConfigConstants: Typo: YYARN -> YARN
>
>
> Key: FLINK-4943
> URL: https://issues.apache.org/jira/browse/FLINK-4943
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
>Reporter: Mischa Krüger
>Priority: Trivial
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612673#comment-15612673
]
ASF GitHub Bot commented on FLINK-4887:
---
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2699#discussion_r85386607
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+public class FutureUtils {
--- End diff --
There is already a class `FutureUtil` in `flink-core`.
Can you combine this class with that one?
> Replace ActorGateway by TaskManagerGateway interface
>
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
> Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}}
> and {{ExecutionVertex}} it would be better to decouple the two components by
> introducing the {{TaskGateway}} interface which provides access to task
> related rpc calls. The {{ActorGateway}} could be one implementation of the
> interface.
> This change will prepare the further implementation of Flip-6.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612675#comment-15612675
]
ASF GitHub Bot commented on FLINK-4887:
---
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2699#discussion_r85383836
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --
Leftover change
> Replace ActorGateway by TaskManagerGateway interface
>
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
> Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}}
> and {{ExecutionVertex}} it would be better to decouple the two components by
> introducing the {{TaskGateway}} interface which provides access to task
> related rpc calls. The {{ActorGateway}} could be one implementation of the
> interface.
> This change will prepare the further implementation of Flip-6.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612690#comment-15612690
]
ASF GitHub Bot commented on FLINK-4943:
---
Github user greghogan commented on the issue:
https://github.com/apache/flink/pull/2704
I keep an `_minutiae` branch and push that every few months, typically
after a release. There are fixed costs at each level of Flink's ticketing /
pull request / verify / commit process.
> flink-mesos/ConfigConstants: Typo: YYARN -> YARN
>
>
> Key: FLINK-4943
> URL: https://issues.apache.org/jira/browse/FLINK-4943
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
>Reporter: Mischa Krüger
>Priority: Trivial
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user greghogan commented on the issue:
https://github.com/apache/flink/pull/2704
I keep an `_minutiae` branch and push that every few months, typically
after a release. There are fixed costs at each level of Flink's ticketing /
pull request / verify / commit process.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2699#discussion_r85392726
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.Preconditions;
+
+public class StackTrace {
--- End diff --
This also needs to be serializable
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2699#discussion_r85390985
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Response to the TriggerStackTraceSample message.
+ */
+public class StackTraceSampleResponse {
--- End diff --
This needs to be `java.io.Serializable`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612672#comment-15612672
]
ASF GitHub Bot commented on FLINK-4887:
---
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2699#discussion_r85392726
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.Preconditions;
+
+public class StackTrace {
--- End diff --
This also needs to be serializable
> Replace ActorGateway by TaskManagerGateway interface
>
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
> Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}}
> and {{ExecutionVertex}} it would be better to decouple the two components by
> introducing the {{TaskGateway}} interface which provides access to task
> related rpc calls. The {{ActorGateway}} could be one implementation of the
> interface.
> This change will prepare the further implementation of Flip-6.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2699#discussion_r85383836
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --
Leftover change
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612674#comment-15612674
]
ASF GitHub Bot commented on FLINK-4887:
---
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2699#discussion_r85390985
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Response to the TriggerStackTraceSample message.
+ */
+public class StackTraceSampleResponse {
--- End diff --
This needs to be `java.io.Serializable`
> Replace ActorGateway by TaskManagerGateway interface
>
>
> Key: FLINK-4887
> URL: https://issues.apache.org/jira/browse/FLINK-4887
> Project: Flink
> Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0
>
>
> Instead of accessing directly on the {{ActorGateway}} in the {{Execution}}
> and {{ExecutionVertex}} it would be better to decouple the two components by
> introducing the {{TaskGateway}} interface which provides access to task
> related rpc calls. The {{ActorGateway}} could be one implementation of the
> interface.
> This change will prepare the further implementation of Flip-6.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user Makman2 commented on the issue:
https://github.com/apache/flink/pull/2704
Though I wouldn't fix more than this one (except I stumble over some typos
again) :3
Is there already a larger ticket? @greghogan
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612618#comment-15612618
]
ASF GitHub Bot commented on FLINK-4943:
---
Github user Makman2 commented on the issue:
https://github.com/apache/flink/pull/2704
Though I wouldn't fix more than this one (except I stumble over some typos
again) :3
Is there already a larger ticket? @greghogan
> flink-mesos/ConfigConstants: Typo: YYARN -> YARN
>
>
> Key: FLINK-4943
> URL: https://issues.apache.org/jira/browse/FLINK-4943
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
>Reporter: Mischa Krüger
>Priority: Trivial
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612488#comment-15612488
]
ASF GitHub Bot commented on FLINK-4939:
---
GitHub user kl0u opened a pull request:
https://github.com/apache/flink/pull/2707
[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the
committing subtask for a pending checkpoint
So far the GenericWriteAheadSink expected that the subtask that wrote a
temporary buffer to the
state backend upon checkpointing, will be also the one to commit it to the
third-party storage system.
This commit removes this assumption. To do this it changes the
CheckpointCommitter to dynamically take the subtaskIdx as a parameter when
committing something to the third-party storage system ( [void
commitCheckpoint(int subtaskIdx, long checkpointID)] ) and when asking
if a checkpoint was committed ( [boolean isCheckpointCommitted(int
subtaskIdx, long checkpointID)] ) and also changes the state kept by the
[GenericWriteAheadSink] to also
include that subtask index of the subtask that wrote the pending buffer.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kl0u/flink write_ahead_sink
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2707.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2707
commit 207c5239c9fab6ef09b7bdd410ee83d3d8ca105f
Author: kl0u
Date: 2016-10-26T15:19:12Z
[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the
committing subtask for a pending checkpoint
So far the GenericWriteAheadSink expected that
the subtask that wrote a temporary buffer to the
state backend, will be also the one to commit it to
the third-party storage system.
This commit removes this assumption. To do this
it changes the CheckpointCommitter to dynamically
take the subtaskIdx as a parameter when asking
if a checkpoint was committed and also changes the
state kept by the GenericWriteAheadSink to also
include that subtask index of the subtask that wrote
the pending buffer.
> GenericWriteAheadSink: Decouple the creating from the committing subtask for
> a pending checkpoint
> -
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also
> include that subtask index of the subtask that wrote
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
GitHub user kl0u opened a pull request:
https://github.com/apache/flink/pull/2707
[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the
committing subtask for a pending checkpoint
So far the GenericWriteAheadSink expected that the subtask that wrote a
temporary buffer to the
state backend upon checkpointing, will be also the one to commit it to the
third-party storage system.
This commit removes this assumption. To do this it changes the
CheckpointCommitter to dynamically take the subtaskIdx as a parameter when
committing something to the third-party storage system ( [void
commitCheckpoint(int subtaskIdx, long checkpointID)] ) and when asking
if a checkpoint was committed ( [boolean isCheckpointCommitted(int
subtaskIdx, long checkpointID)] ) and also changes the state kept by the
[GenericWriteAheadSink] to also
include that subtask index of the subtask that wrote the pending buffer.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kl0u/flink write_ahead_sink
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2707.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2707
commit 207c5239c9fab6ef09b7bdd410ee83d3d8ca105f
Author: kl0u
Date: 2016-10-26T15:19:12Z
[FLINK-4939] GenericWriteAheadSink: Decouple the creating from the
committing subtask for a pending checkpoint
So far the GenericWriteAheadSink expected that
the subtask that wrote a temporary buffer to the
state backend, will be also the one to commit it to
the third-party storage system.
This commit removes this assumption. To do this
it changes the CheckpointCommitter to dynamically
take the subtaskIdx as a parameter when asking
if a checkpoint was committed and also changes the
state kept by the GenericWriteAheadSink to also
include that subtask index of the subtask that wrote
the pending buffer.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612480#comment-15612480
]
ASF GitHub Bot commented on FLINK-4889:
---
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/2698
> Make InstanceManager independent of ActorRef
>
>
> Key: FLINK-4889
> URL: https://issues.apache.org/jira/browse/FLINK-4889
> Project: Flink
> Issue Type: Improvement
> Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{InstanceManager}} strongly depends on {{ActorRef}}, because
> it uses the actor refs to distinguish task managers. I propose to make the
> {{InstanceManager}} independent of these {{ActorRefs}} because they are
> implementation dependent (see Flip-6).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/2698
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612477#comment-15612477
]
ASF GitHub Bot commented on FLINK-4945:
---
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2706
R @rmetzger
> KafkaConsumer logs wrong warning about confirmation for unknown checkpoint
> --
>
> Key: FLINK-4945
> URL: https://issues.apache.org/jira/browse/FLINK-4945
> Project: Flink
> Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> Checkpoints are currently not registered in all cases. While the code still
> behaves correctly this leads to misleading warnings.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2706
R @rmetzger
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kostas Kloudas updated FLINK-4939:
--
Summary: GenericWriteAheadSink: Decouple the creating from the committing
subtask for a pending checkpoint (was: GenericWriteAheadSink: Decouple the
subtask creating a pending checkpoint from the one commiting it.)
> GenericWriteAheadSink: Decouple the creating from the committing subtask for
> a pending checkpoint
> -
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also
> include that subtask index of the subtask that wrote
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612472#comment-15612472
]
ASF GitHub Bot commented on FLINK-4945:
---
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2706
[FLINK-4945] KafkaConsumer logs wrong warning about confirmation for …
This PR fixes an unjustified warning about unknown checkpoints in
FlinkKafkaConsumerBase.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink fix-kafka-warning
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2706.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2706
> KafkaConsumer logs wrong warning about confirmation for unknown checkpoint
> --
>
> Key: FLINK-4945
> URL: https://issues.apache.org/jira/browse/FLINK-4945
> Project: Flink
> Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> Checkpoints are currently not registered in all cases. While the code still
> behaves correctly this leads to misleading warnings.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/2706
[FLINK-4945] KafkaConsumer logs wrong warning about confirmation for â¦
This PR fixes an unjustified warning about unknown checkpoints in
FlinkKafkaConsumerBase.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink fix-kafka-warning
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2706.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2706
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kostas Kloudas updated FLINK-4939:
--
Summary: GenericWriteAheadSink: Decouple the subtask creating a pending
checkpoint from the one commiting it. (was: GenericWriteAheadSink: Decouple
the subtask that created a pending checkpoint from the one that commits it.)
> GenericWriteAheadSink: Decouple the subtask creating a pending checkpoint
> from the one commiting it.
>
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also
> include that subtask index of the subtask that wrote
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612467#comment-15612467
]
Greg Hogan commented on FLINK-4823:
---
How are you creating your {{Graph}}? Each vertex represented in the edge set
should be in the vertex set.
Based on this exception we do need to add better error checking with a more
specific log message.
> org.apache.flink.types.NullFieldException: Field 0 is null, but expected to
> hold a value
>
>
> Key: FLINK-4823
> URL: https://issues.apache.org/jira/browse/FLINK-4823
> Project: Flink
> Issue Type: Bug
> Components: Gelly
>Affects Versions: 1.1.0
> Environment: RHEL 6.6
>Reporter: Sajeev Ramakrishnan
>
> Team,
> We are getting NULL pointer exception while doing the vertex centric graph
> traversal.
> org.apache.flink.types.NullFieldException: Field 0 is null, but expected to
> hold a value.
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:126)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at
> org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction.join(JoinOperator.java:572)
> at
> org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver.run(JoinWithSolutionSetFirstDriver.java:196)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at
> org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:122)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> ... 13 more
> All the parameters that I am passing for the vertex and edges are not null.
> Not able to find out the root cause.
> Thanks & Regards,
> Sajeev Ramakrishnan
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612452#comment-15612452
]
ASF GitHub Bot commented on FLINK-4889:
---
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/2698
Looks good
+1 to merge this
> Make InstanceManager independent of ActorRef
>
>
> Key: FLINK-4889
> URL: https://issues.apache.org/jira/browse/FLINK-4889
> Project: Flink
> Issue Type: Improvement
> Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{InstanceManager}} strongly depends on {{ActorRef}}, because
> it uses the actor refs to distinguish task managers. I propose to make the
> {{InstanceManager}} independent of these {{ActorRefs}} because they are
> implementation dependent (see Flip-6).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/2698
Looks good
+1 to merge this
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stefan Richter reassigned FLINK-4945:
-
Assignee: Stefan Richter
> KafkaConsumer logs wrong warning about confirmation for unknown checkpoint
> --
>
> Key: FLINK-4945
> URL: https://issues.apache.org/jira/browse/FLINK-4945
> Project: Flink
> Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> Checkpoints are currently not registered in all cases. While the code still
> behaves correctly this leads to misleading warnings.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Stefan Richter created FLINK-4945:
-
Summary: KafkaConsumer logs wrong warning about confirmation for
unknown checkpoint
Key: FLINK-4945
URL: https://issues.apache.org/jira/browse/FLINK-4945
Project: Flink
Issue Type: Bug
Reporter: Stefan Richter
Priority: Minor
Checkpoints are currently not registered in all cases. While the code still
behaves correctly this leads to misleading warnings.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612315#comment-15612315
]
ASF GitHub Bot commented on FLINK-2608:
---
Github user chermenin commented on a diff in the pull request:
https://github.com/apache/flink/pull/2623#discussion_r85369265
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt =
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-
+
+ GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+ pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); //
1976
+
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --
I think it can be left. Just on the safe side.
> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task
> 'DataSource (at main(Test.java:32)
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at
>
Github user chermenin commented on a diff in the pull request:
https://github.com/apache/flink/pull/2623#discussion_r85369265
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt =
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-
+
+ GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+ pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); //
1976
+
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --
I think it can be left. Just on the safe side.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612306#comment-15612306
]
ASF GitHub Bot commented on FLINK-2608:
---
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/2623
I like this approach. One minor question, otherwise +1 to go here
> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task
> 'DataSource (at main(Test.java:32)
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/2623
I like this approach. One minor question, otherwise +1 to go here
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612301#comment-15612301
]
ASF GitHub Bot commented on FLINK-2608:
---
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2623#discussion_r85368023
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt =
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-
+
+ GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+ pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); //
1976
+
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --
Are these still needed any more by the tests?
> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task
> 'DataSource (at main(Test.java:32)
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at
>
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/2623#discussion_r85368023
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
---
@@ -710,7 +712,103 @@ public String toString() {
pwc2.bigInt =
BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-
+
+ GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+ pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); //
1976
+
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet
getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
--- End diff --
Are these still needed any more by the tests?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612284#comment-15612284
]
ASF GitHub Bot commented on FLINK-4733:
---
Github user rmetzger commented on the issue:
https://github.com/apache/flink/pull/2616
I tested the change locally, it works. +1 to merge.
> Port WebFrontend to new metric system
> -
>
> Key: FLINK-4733
> URL: https://issues.apache.org/jira/browse/FLINK-4733
> Project: Flink
> Issue Type: Improvement
> Components: Metrics, TaskManager, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> While the WebFrontend has access to the metric system it still relies on
> older code in some parts.
> The TaskManager metrics are still gathered using the Codahale library and
> send with the heartbeats.
> Task related metrics (numRecordsIn etc) are still gathered using
> accumulators, which are accessed through the execution graph.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user rmetzger commented on the issue:
https://github.com/apache/flink/pull/2616
I tested the change locally, it works. +1 to merge.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612219#comment-15612219
]
ASF GitHub Bot commented on FLINK-4552:
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85361208
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger trigger;
+ private final TypeSerializer windowSerializer;
+
+ private final HeapKeyedStateBackend stateBackend;
+ private final TestInternalTimerService internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger trigger,
+ TypeSerializer windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows
work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend stateBackend =
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new
JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new
KeyContext() {
+ @Override
+
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/2690
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612217#comment-15612217
]
ASF GitHub Bot commented on FLINK-4552:
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360067
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java
---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+
mock(WindowAssigner.WindowAssignerContext.class);
+
+ EventTimeSessionWindows assigner =
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
--- End diff --
Very minor, but I think a named constant for the value 5000 might make the
contracts even clearer (e.g. int gap = 5000) and then do (+ gap) to window
start time to obtain end time.
> Refactor WindowOperator/Trigger Tests
> -
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
> Issue Type: Improvement
> Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files,
> possibly one per trigger, for the triggers. Also, we should extend/change the
> tests in some key ways:
> - {{WindowOperatorTest}} test should just verify that the interaction
> between {{WindowOperator}} and the various other parts works as expected,
> that the correct methods on {{Trigger}} and {{WindowFunction}} are called at
> the expected time and that snapshotting, timers, cleanup etc. work correctly.
> These tests should also verify that the different state types and
> {{WindowFunctions}} work correctly.
> - {{Trigger}} tests should present elements to triggers and verify that they
> fire at the correct times. The actual output of the {{WindowFunction}} is not
> important for these tests. We should also test that triggers correctly
[
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612273#comment-15612273
]
ASF GitHub Bot commented on FLINK-4715:
---
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/2652
> TaskManager should commit suicide after cancellation failure
>
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
> Issue Type: Improvement
> Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a
> given time, the {{TaskManager}} should kill itself. That way we guarantee
> that there is no resource leak.
> This behaviour acts as a safety-net against faulty user code.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/2652
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ufuk Celebi closed FLINK-4715.
--
Resolution: Fixed
Fix Version/s: 1.1.4
Fixed in cc6655b (release-1.1), 27fd249 (master).
> TaskManager should commit suicide after cancellation failure
>
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
> Issue Type: Improvement
> Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a
> given time, the {{TaskManager}} should kill itself. That way we guarantee
> that there is no resource leak.
> This behaviour acts as a safety-net against faulty user code.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612253#comment-15612253
]
ASF GitHub Bot commented on FLINK-4894:
---
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/2690
> Don't block on buffer request after broadcastEvent
> ---
>
> Key: FLINK-4894
> URL: https://issues.apache.org/jira/browse/FLINK-4894
> Project: Flink
> Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> After broadcasting an event (like the checkpoint barrier), the record writer
> might block on a buffer request although that buffer will only be needed on
> the next write on that channel.
> Instead of assuming that each serializer has a buffer set, we can change the
> logic in the writer to request the buffer when it requires one.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ufuk Celebi closed FLINK-4894.
--
Resolution: Fixed
Fix Version/s: 1.1.4
1.2.0
Fixed in 529534f (release-1.1), cbdb784 (master).
> Don't block on buffer request after broadcastEvent
> ---
>
> Key: FLINK-4894
> URL: https://issues.apache.org/jira/browse/FLINK-4894
> Project: Flink
> Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.1.4
>
>
> After broadcasting an event (like the checkpoint barrier), the record writer
> might block on a buffer request although that buffer will only be needed on
> the next write on that channel.
> Instead of assuming that each serializer has a buffer set, we can change the
> logic in the writer to request the buffer when it requires one.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ufuk Celebi reopened FLINK-4510:
> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator is only created if a checkpointing interval is
> configured. This means that no savepoints can be triggered if there is no
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled
> periodic checkpoints.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612226#comment-15612226
]
Ufuk Celebi commented on FLINK-4510:
Fixed in {{b420750}} (release-1.1).
> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator is only created if a checkpointing interval is
> configured. This means that no savepoints can be triggered if there is no
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled
> periodic checkpoints.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ufuk Celebi closed FLINK-4510.
--
Resolution: Fixed
Fix Version/s: 1.1.4
> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator is only created if a checkpointing interval is
> configured. This means that no savepoints can be triggered if there is no
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled
> periodic checkpoints.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1561#comment-1561
]
ASF GitHub Bot commented on FLINK-4552:
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85361303
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger trigger;
+ private final TypeSerializer windowSerializer;
+
+ private final HeapKeyedStateBackend stateBackend;
+ private final TestInternalTimerService internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger trigger,
+ TypeSerializer windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows
work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend stateBackend =
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new
JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new
KeyContext() {
+ @Override
+
[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612221#comment-15612221
]
ASF GitHub Bot commented on FLINK-4552:
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360454
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+ /**
+* Verify that state of separate windows does not leak into other
windows.
+*/
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness
[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612215#comment-15612215
]
ASF GitHub Bot commented on FLINK-4552:
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85359406
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+* Verify that state of separate windows does not leak into other
windows.
+*/
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360067
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsWindowsTest.java
---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link EventTimeSessionWindows}
+ */
+public class EventTimeSessionWindowsWindowsTest extends TestLogger {
+
+ @Test
+ public void testWindowAssignment() {
+ WindowAssigner.WindowAssignerContext mockContext =
+
mock(WindowAssigner.WindowAssignerContext.class);
+
+ EventTimeSessionWindows assigner =
EventTimeSessionWindows.withGap(Time.milliseconds(5000));
--- End diff --
Very minor, but I think a named constant for the value 5000 might make the
contracts even clearer (e.g. int gap = 5000) and then do (+ gap) to window
start time to obtain end time.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85361303
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger trigger;
+ private final TypeSerializer windowSerializer;
+
+ private final HeapKeyedStateBackend stateBackend;
+ private final TestInternalTimerService internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger trigger,
+ TypeSerializer windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows
work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend stateBackend =
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new
JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new
KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360454
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java
---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link EventTimeTrigger}.
+ */
+public class EventTimeTriggerTest {
+
+ /**
+* Verify that state of separate windows does not leak into other
windows.
+*/
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness
[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612218#comment-15612218
]
ASF GitHub Bot commented on FLINK-4552:
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85359630
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+* Verify that state of separate windows does not leak into other
windows.
+*/
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85359630
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+* Verify that state of separate windows does not leak into other
windows.
+*/
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness testHarness =
+ new
TriggerTestHarness<>(CountTrigger.of(3), new
TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new
StreamRecord(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+ // right now, CountTrigger will clear it's state in onElement
when firing
+ // ideally, this should be moved to onFire()
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+ }
+
+ /**
+* Verify that clear() does not leak across windows.
+*/
+ @Test
+ public void testClear() throws Exception {
+ TriggerTestHarness testHarness =
+ new
TriggerTestHarness<>(CountTrigger.of(3), new
TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
+ testHarness.clearTriggerState(new TimeWindow(2, 4));
+
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
+
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85361208
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility for testing {@link Trigger} behaviour.
+ */
+public class TriggerTestHarness {
+
+ private static final Integer KEY = 1;
+
+ private final Trigger trigger;
+ private final TypeSerializer windowSerializer;
+
+ private final HeapKeyedStateBackend stateBackend;
+ private final TestInternalTimerService internalTimerService;
+
+ public TriggerTestHarness(
+ Trigger trigger,
+ TypeSerializer windowSerializer) throws Exception {
+ this.trigger = trigger;
+ this.windowSerializer = windowSerializer;
+
+ // we only ever use one key, other tests make sure that windows
work across different
+ // keys
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ @SuppressWarnings("unchecked")
+ HeapKeyedStateBackend stateBackend =
(HeapKeyedStateBackend) backend.createKeyedStateBackend(dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ new KvStateRegistry().createTaskRegistry(new
JobID(), new JobVertexID()));
+ this.stateBackend = stateBackend;
+
+ this.stateBackend.setCurrentKey(0);
+
+ this.internalTimerService = new TestInternalTimerService<>(new
KeyContext() {
+ @Override
+ public void setCurrentKey(Object key) {
+ // ignore
+ }
+
+ @Override
+ public Object getCurrentKey() {
+
[
https://issues.apache.org/jira/browse/FLINK-4552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612216#comment-15612216
]
ASF GitHub Bot commented on FLINK-4552:
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360968
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
+import static
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
+import static
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+ /**
+* Check if {@link PurgingTrigger} implements all methods of {@link
Trigger}, as a sanity
+* check.
+*/
+ @Test
+ public void testAllMethodsImplemented() throws NoSuchMethodException {
--- End diff --
What is the purpose of this and how is it different from checking that
PurgingTrigger is not abstract?
> Refactor WindowOperator/Trigger Tests
> -
>
> Key: FLINK-4552
> URL: https://issues.apache.org/jira/browse/FLINK-4552
> Project: Flink
> Issue Type: Improvement
> Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now, tests for {{WindowOperator}}, {{WindowAssigner}}, {{Trigger}} and
> {{WindowFunction}} are all conflated in {{WindowOperatorTest}}. All of these
> test that a certain combination of a {{Trigger}}, {{WindowAssigner}} and
> {{WindowFunction}} produce the expected output.
> We should modularize these tests and spread them out across multiple files,
> possibly one per trigger, for the triggers. Also, we should extend/change the
> tests in some key ways:
> - {{WindowOperatorTest}} test should just verify that the interaction
> between {{WindowOperator}} and the various other parts works as expected,
> that the correct methods on {{Trigger}} and {{WindowFunction}} are called at
> the expected time and that snapshotting, timers, cleanup etc. work correctly.
> These tests should also verify that the different state types and
> {{WindowFunctions}} work correctly.
> - {{Trigger}} tests should present elements to triggers and verify that they
> fire at the correct times. The actual output of the {{WindowFunction}} is not
> important for these tests. We should also test that triggers correctly clean
> up state and timers.
> - {{WindowAssigner}} tests should test each window assigner and also verify
> that, for example, the offset parameter of time-based windows works correctly.
> There is already {{WindowingTestHarness}} but it is not used by tests, I
> think we can expand on that and provide more thorough test coverage while
> also making the tests more maintainable ({{WindowOperatorTest.java}} is
>
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85359406
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CountTrigger}.
+ */
+public class CountTriggerTest {
+
+ /**
+* Verify that state of separate windows does not leak into other
windows.
+*/
+ @Test
+ public void testWindowSeparationAndFiring() throws Exception {
+ TriggerTestHarness testHarness =
+ new
TriggerTestHarness<>(CountTrigger.of(3), new
TimeWindow.Serializer());
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+ // shouldn't have any timers
+ assertEquals(0, testHarness.numProcessingTimeTimers());
+ assertEquals(0, testHarness.numEventTimeTimers());
+
+ assertEquals(2, testHarness.numStateEntries());
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.FIRE, testHarness.processElement(new
StreamRecord(1), new TimeWindow(0, 2)));
+ assertEquals(TriggerResult.CONTINUE,
testHarness.processElement(new StreamRecord(1), new TimeWindow(2, 4)));
+
+ // right now, CountTrigger will clear it's state in onElement
when firing
+ // ideally, this should be moved to onFire()
+ assertEquals(1, testHarness.numStateEntries());
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(0,
2)));
+ assertEquals(1, testHarness.numStateEntries(new TimeWindow(2,
4)));
+ }
--- End diff --
You could also let W(2,4) fire for completeness, so that the count was not
reset by the previous firing.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2572#discussion_r85360968
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java
---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyOnMergeContext;
+import static
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTimeWindow;
+import static
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest.anyTriggerContext;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link PurgingTrigger}.
+ */
+public class PurgingTriggerTest {
+
+ /**
+* Check if {@link PurgingTrigger} implements all methods of {@link
Trigger}, as a sanity
+* check.
+*/
+ @Test
+ public void testAllMethodsImplemented() throws NoSuchMethodException {
--- End diff --
What is the purpose of this and how is it different from checking that
PurgingTrigger is not abstract?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612172#comment-15612172
]
ASF GitHub Bot commented on FLINK-4733:
---
Github user rmetzger commented on the issue:
https://github.com/apache/flink/pull/2616
Thank you for rebasing.
This run:
had the following error:
https://s3.amazonaws.com/archive.travis-ci.org/jobs/170428661/log.txt
```
Failed tests:
CoordinatorShutdownTest.testCoordinatorShutsDownOnFailure:94
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph cannot be cast
to org.apache.flink.runtime.executiongraph.ExecutionGraph
```
I suspect we need to change the cast there to `AccessExecutionGraph`.
> Port WebFrontend to new metric system
> -
>
> Key: FLINK-4733
> URL: https://issues.apache.org/jira/browse/FLINK-4733
> Project: Flink
> Issue Type: Improvement
> Components: Metrics, TaskManager, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0
>
>
> While the WebFrontend has access to the metric system it still relies on
> older code in some parts.
> The TaskManager metrics are still gathered using the Codahale library and
> send with the heartbeats.
> Task related metrics (numRecordsIn etc) are still gathered using
> accumulators, which are accessed through the execution graph.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user rmetzger commented on the issue:
https://github.com/apache/flink/pull/2616
Thank you for rebasing.
This run:
had the following error:
https://s3.amazonaws.com/archive.travis-ci.org/jobs/170428661/log.txt
```
Failed tests:
CoordinatorShutdownTest.testCoordinatorShutsDownOnFailure:94
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph cannot be cast
to org.apache.flink.runtime.executiongraph.ExecutionGraph
```
I suspect we need to change the cast there to `AccessExecutionGraph`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruwen Moos resolved FLINK-4756.
---
Resolution: Not A Bug
Version of flink for the cluster and in the maven project were to far apart.
If you have this error check the flink version used for building and of the
cluster.
> NullPointerException on submiting a job with
> StreamExecutionEnvironment.createRemoteEnvironment to a flink cluster
> --
>
> Key: FLINK-4756
> URL: https://issues.apache.org/jira/browse/FLINK-4756
> Project: Flink
> Issue Type: Bug
> Components: Cluster Management
>Affects Versions: 1.1.2
> Environment: cluster@centOS7 in a docker container
>Reporter: Ruwen Moos
>
> Flink cluster throws the following exception when I try to send my job to my
> Flink cluster with StreamExecutionEnvironment.createRemoteEnvironment.
> 2016-10-04 07:29:46,106 ERROR org.apache.flink.runtime.jobmanager.JobManager
> - Failed to submit job 08f41104b8b523380fbc8f0d7d1da6f1 (Flink
> Java Job at Tue Oct 04 09:29:37 CEST 2016)
> java.lang.NullPointerException
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:478)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> The Flink programm runs as a standalone flink programm with
> StreamExecutionEnvironment.getExecutionEnvironment() without any issues. With
> getExecutionEnvironment() uploading via the web gui works when running it on
> the cluster, just not via a RemoteStreamEnvironment
> Same exception also happens when using a local cluster on windows.
> I also tried running the wordcount example from flink-examples on the
> cluster, same result.
> Jars for the job are created with the maven-assembly-plugin.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user rmetzger commented on the issue:
https://github.com/apache/flink/pull/2694
+1 to merge.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-2597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612146#comment-15612146
]
ASF GitHub Bot commented on FLINK-2597:
---
GitHub user rmetzger opened a pull request:
https://github.com/apache/flink/pull/2705
[FLINK-2597][FLINK-4050] Add wrappers for Kafka serializers, test for
partitioner and documentation
This pull requests addresses the following JIRAs:
- [FLINK-2597
Add a test for Avro-serialized Kafka
messages](https://issues.apache.org/jira/browse/FLINK-2597)
- [FLINK-4050 FlinkKafkaProducer API
Refactor](https://issues.apache.org/jira/browse/FLINK-4050)
The PR adds:
- `KafkaSerializerWrapper` and `KafkaDeserializerWrapper` wrappers for
using Kafka serializers with Flink
- A test case involving Confluent's `KafkaAvroSerializer` and
`KafkaAvroDeserializer`. They also use the schema registry from confluent
(which I'm mocking with a simple test http server in the test).
- A test validating that we are properly calling Kafka partitioners with
the producer
- Documentation for everything mentioned above.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rmetzger/flink flink2597
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2705.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2705
commit 76365abb8ae378eaee3809ec050a09e266d745a2
Author: Robert Metzger
Date: 2016-10-26T18:48:21Z
[FLINK-2597] Add wrappers for Kafka serializers, tests for Kafka
partitioner and documentation
> Add a test for Avro-serialized Kafka messages
> --
>
> Key: FLINK-2597
> URL: https://issues.apache.org/jira/browse/FLINK-2597
> Project: Flink
> Issue Type: Bug
> Components: Batch Connectors and Input/Output Formats, Kafka
> Connector
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Vimal
>Priority: Minor
>
> A user has asked for serializing Avro messages from Kafka.
> I think its a legitimate use-case that we should cover by a test case.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
GitHub user rmetzger opened a pull request:
https://github.com/apache/flink/pull/2705
[FLINK-2597][FLINK-4050] Add wrappers for Kafka serializers, test for
partitioner and documentation
This pull requests addresses the following JIRAs:
- [FLINK-2597
Add a test for Avro-serialized Kafka
messages](https://issues.apache.org/jira/browse/FLINK-2597)
- [FLINK-4050 FlinkKafkaProducer API
Refactor](https://issues.apache.org/jira/browse/FLINK-4050)
The PR adds:
- `KafkaSerializerWrapper` and `KafkaDeserializerWrapper` wrappers for
using Kafka serializers with Flink
- A test case involving Confluent's `KafkaAvroSerializer` and
`KafkaAvroDeserializer`. They also use the schema registry from confluent
(which I'm mocking with a simple test http server in the test).
- A test validating that we are properly calling Kafka partitioners with
the producer
- Documentation for everything mentioned above.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rmetzger/flink flink2597
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2705.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2705
commit 76365abb8ae378eaee3809ec050a09e266d745a2
Author: Robert Metzger
Date: 2016-10-26T18:48:21Z
[FLINK-2597] Add wrappers for Kafka serializers, tests for Kafka
partitioner and documentation
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612123#comment-15612123
]
ASF GitHub Bot commented on FLINK-2608:
---
Github user chermenin commented on the issue:
https://github.com/apache/flink/pull/2623
I exclude Kryo as a dependency from Chill and add simple test to check Java
collections. Tests added to `flink-tests` because it is depends on
`flink-runtime` (where used Chill).
> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task
> 'DataSource (at main(Test.java:32)
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at
Github user chermenin commented on the issue:
https://github.com/apache/flink/pull/2623
I exclude Kryo as a dependency from Chill and add simple test to check Java
collections. Tests added to `flink-tests` because it is depends on
`flink-runtime` (where used Chill).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2696
Thanks for the review @StephanEwen. Yes let's do it as you've proposed.
I've opened an [issue](https://issues.apache.org/jira/browse/FLINK-4944) for
replacing Akka's death watch with our own heartbeat on the `TaskManager` side.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chesnay Schepler closed FLINK-4037.
---
Resolution: Fixed
Fix Version/s: 1.2.0
> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
> Issue Type: Improvement
> Components: Webfrontend
>Reporter: Robert Metzger
> Fix For: 1.2.0
>
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all
> classes from a finished job, we need to convert the ExecutionGraph (and some
> attached objects like the ExecutionConfig) into a stringified version, not
> containing any user classes.
> The web frontend can show strings only anyways.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user tillrohrmann closed the pull request at:
https://github.com/apache/flink/pull/2697
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612084#comment-15612084
]
ASF GitHub Bot commented on FLINK-3347:
---
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2696
Thanks for the review @StephanEwen. Yes let's do it as you've proposed.
I've opened an [issue](https://issues.apache.org/jira/browse/FLINK-4944) for
replacing Akka's death watch with our own heartbeat on the `TaskManager` side.
> TaskManager (or its ActorSystem) need to restart in case they notice
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
> Issue Type: Improvement
> Components: TaskManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.2.0, 1.1.4
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no
> further communication is possible with that actor system unless one of the
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at
> the JobManager (Akka refuses connection because of the quarantined state),
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor
> system event stream:
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Till Rohrmann created FLINK-4944:
Summary: Replace Akka's deatch watch with own heartbeat on the TM
side
Key: FLINK-4944
URL: https://issues.apache.org/jira/browse/FLINK-4944
Project: Flink
Issue Type: Improvement
Components: TaskManager
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Fix For: 1.2.0
In order to properly implement FLINK-3347, the {{TaskManager}} must no longer
use Akka's death watch mechanism to detect {{JobManager}} failures. The reason
is that a hard {{JobManager}} failure will lead to quarantining the
{{JobManager's}} {{ActorSystem}} by the {{TaskManagers}}. This in combination
with FLINK-3347 would lead to a shutdown of all {{TaskManagers}}.
Instead we should use our own heartbeat signal to detect dead {{JobManagers}}.
In case of a heartbeat timeout, the {{TaskManager}} won't shut down but simply
cancel and clear everything.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15612066#comment-15612066
]
ASF GitHub Bot commented on FLINK-3347:
---
Github user tillrohrmann closed the pull request at:
https://github.com/apache/flink/pull/2697
> TaskManager (or its ActorSystem) need to restart in case they notice
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
> Issue Type: Improvement
> Components: TaskManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.2.0, 1.1.4
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no
> further communication is possible with that actor system unless one of the
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at
> the JobManager (Akka refuses connection because of the quarantined state),
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor
> system event stream:
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2691
Thanks for the reviews @StephanEwen and @aljoscha ! I addressed your
comments in my last commit.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-4910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611984#comment-15611984
]
ASF GitHub Bot commented on FLINK-4910:
---
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/2691
Thanks for the reviews @StephanEwen and @aljoscha ! I addressed your
comments in my last commit.
> Introduce safety net for closing file system streams
>
>
> Key: FLINK-4910
> URL: https://issues.apache.org/jira/browse/FLINK-4910
> Project: Flink
> Issue Type: Improvement
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Streams that are opened through {{FileSystem}} must be closed at the end of
> their life cycle. However, we found hints that some code forgets to close
> such streams.
> We should introduce i) a mechanism that closes leaking unclosed streams after
> usage and ii) provides logging that helps us to track down and fi the sources
> of such leaks.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/FLINK-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611981#comment-15611981
]
ASF GitHub Bot commented on FLINK-4876:
---
Github user attachmentgenie commented on the issue:
https://github.com/apache/flink/pull/2680
@StephanEwen seems reasonable thing to do, i updated the code to use the
new ConfigOption method
> Allow web interface to be bound to a specific ip/interface/inetHost
> ---
>
> Key: FLINK-4876
> URL: https://issues.apache.org/jira/browse/FLINK-4876
> Project: Flink
> Issue Type: Improvement
> Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.2, 1.1.3
>Reporter: Bram Vogelaar
>Assignee: Bram Vogelaar
>Priority: Minor
>
> Currently the web interface automatically binds to all interfaces on 0.0.0.0.
> IMHO there are some use cases to only bind to a specific ipadress, (e.g.
> access through an authenticated proxy, not binding on the management or
> backup interface)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user attachmentgenie commented on the issue:
https://github.com/apache/flink/pull/2680
@StephanEwen seems reasonable thing to do, i updated the code to use the
new ConfigOption method
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---
[
https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611963#comment-15611963
]
ASF GitHub Bot commented on FLINK-3347:
---
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2697
Thanks for the review @StephanEwen. I will change the configuration name
and then merge the PR into the release-1.1 branch.
Travis passed locally and the failing test cases are unrelated.
> TaskManager (or its ActorSystem) need to restart in case they notice
> quarantine
> ---
>
> Key: FLINK-3347
> URL: https://issues.apache.org/jira/browse/FLINK-3347
> Project: Flink
> Issue Type: Improvement
> Components: TaskManager
>Affects Versions: 0.10.1
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.0.0, 1.2.0, 1.1.4
>
>
> There are cases where Akka quarantines remote actor systems. In that case, no
> further communication is possible with that actor system unless one of the
> two actor systems is restarted.
> The result is that a TaskManager is up and available, but cannot register at
> the JobManager (Akka refuses connection because of the quarantined state),
> making the TaskManager a useless process.
> I suggest to let the TaskManager restart itself once it notices that either
> it quarantined the JobManager, or the JobManager quarantined it.
> It is possible to recognize that by listening to certain events in the actor
> system event stream:
> http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2697
Thanks for the review @StephanEwen. I will change the configuration name
and then merge the PR into the release-1.1 branch.
Travis passed locally and the failing test cases are unrelated.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---