[jira] [Updated] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin

2017-01-22 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5546:
-
Summary: java.io.tmpdir setted as project build directory in surefire 
plugin  (was: When multiple users run test, /tmp/cacheFile conflicts)

> java.io.tmpdir setted as project build directory in surefire plugin
> ---
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834014#comment-15834014
 ] 

ASF GitHub Bot commented on FLINK-5546:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3190

[FLINK-5546][build] When multiple users run test, /tmp/cacheFile conf…

Unit test will create file in java.io.tmpdir, default it's `/tmp` which has 
capacity limit.
Create temporary in project target directory is reasonable, delete them as 
`mvn clean`
 
- [X] General
  - The pull request references the related JIRA issue ("[FLINK-5546] When 
multiple users run test, /tmp/cacheFile conflicts")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink FLINK-5546-tmp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3190


commit 3ec804a5f1de61f00ac49b1d4ed0ef005123daae
Author: shijinkui 
Date:   2017-01-23T07:37:44Z

[FLINK-5546][build] When multiple users run test, /tmp/cacheFile conflicts




> When multiple users run test, /tmp/cacheFile conflicts
> --
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3190: [FLINK-5546][build] When multiple users run test, ...

2017-01-22 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3190

[FLINK-5546][build] When multiple users run test, /tmp/cacheFile conf…

Unit test will create file in java.io.tmpdir, default it's `/tmp` which has 
capacity limit.
Create temporary in project target directory is reasonable, delete them as 
`mvn clean`
 
- [X] General
  - The pull request references the related JIRA issue ("[FLINK-5546] When 
multiple users run test, /tmp/cacheFile conflicts")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink FLINK-5546-tmp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3190


commit 3ec804a5f1de61f00ac49b1d4ed0ef005123daae
Author: shijinkui 
Date:   2017-01-23T07:37:44Z

[FLINK-5546][build] When multiple users run test, /tmp/cacheFile conflicts




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834008#comment-15834008
 ] 

ASF GitHub Bot commented on FLINK-3318:
---

Github user chermenin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97264252
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---
@@ -43,7 +43,14 @@ public State(final String name, final StateType 
stateType) {
this.name = name;
this.stateType = stateType;
 
-   stateTransitions = new ArrayList();
+   stateTransitions = new ArrayList<>();
+   }
+
+   public State(String name, StateType stateType, 
Collection stateTransitions) {
--- End diff --

It seems that this constructor is never used. What is that for?


> Add support for quantifiers to CEP's pattern API
> 
>
> Key: FLINK-3318
> URL: https://issues.apache.org/jira/browse/FLINK-3318
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> It would be a good addition to extend the pattern API to support quantifiers 
> known from regular expressions (e.g. Kleene star, ?, +, or count bounds). 
> This would considerably enrich the set of supported patterns.
> Implementing the count bounds could be done by unrolling the pattern state. 
> In order to support the Kleene star operator, the {{NFACompiler}} has to be 
> extended to insert epsilon-transition between a Kleene start state and the 
> succeeding pattern state. In order to support {{?}}, one could insert two 
> paths from the preceding state, one which accepts the event and another which 
> directly goes into the next pattern state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

2017-01-22 Thread chermenin
Github user chermenin commented on a diff in the pull request:

https://github.com/apache/flink/pull/2361#discussion_r97264252
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---
@@ -43,7 +43,14 @@ public State(final String name, final StateType 
stateType) {
this.name = name;
this.stateType = stateType;
 
-   stateTransitions = new ArrayList();
+   stateTransitions = new ArrayList<>();
+   }
+
+   public State(String name, StateType stateType, 
Collection stateTransitions) {
--- End diff --

It seems that this constructor is never used. What is that for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833957#comment-15833957
 ] 

ASF GitHub Bot commented on FLINK-5508:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3158
  
This is an identical backport / forwardport of the other 3 Mesos fixes, 
correct?
If so, since #3155 and #3156 all have +1s, and #3157 is also a +1 once the 
minor code style comments are addressed, this is also a +1 from my side.


> Remove Mesos dynamic class loading
> --
>
> Key: FLINK-5508
> URL: https://issues.apache.org/jira/browse/FLINK-5508
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> Mesos uses dynamic class loading in order to load the 
> {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be 
> replaced by a compile time dependency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3158: [backport] [FLINK-5508] [FLINK-5496] [FLINK-5495] Fix Mes...

2017-01-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3158
  
This is an identical backport / forwardport of the other 3 Mesos fixes, 
correct?
If so, since #3155 and #3156 all have +1s, and #3157 is also a +1 once the 
minor code style comments are addressed, this is also a +1 from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833953#comment-15833953
 ] 

ASF GitHub Bot commented on FLINK-5508:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97253388
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -394,6 +395,14 @@ protected int runPrivileged(Configuration config, 
Configuration dynamicPropertie
}
}
 
+   if (mesosServices != null) {
+   try {
+   mesosServices.close(false);
+   } catch (Throwable tt) {
+   LOG.error("Error closing the 
ZooKeeperUtilityFactory.", tt);
--- End diff --

This error message is only relevant to `ZooKeeperMesosServices`, and not 
`StandaloneMesosServices.`
Should we just use "Failed to clean up and close MesosServices." here, and 
rely on the stack trace in logs to find out the case?


> Remove Mesos dynamic class loading
> --
>
> Key: FLINK-5508
> URL: https://issues.apache.org/jira/browse/FLINK-5508
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> Mesos uses dynamic class loading in order to load the 
> {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be 
> replaced by a compile time dependency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833949#comment-15833949
 ] 

ASF GitHub Bot commented on FLINK-5508:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97256203
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Wrapper class for a {@link VersionedValue} so that we don't expose a 
curator dependency in our
+ * internal APIs. Such an exposure is problematic due to the relocation of 
curator.
+ */
+public class ZooKeeperVersionedValue {
+
+   private final VersionedValue versionedValue;
+
+   public ZooKeeperVersionedValue(VersionedValue versionedValue) {
+   this.versionedValue = 
Preconditions.checkNotNull(versionedValue);
+   }
+
+   VersionedValue getVersionedValue() {
--- End diff --

nitpick: I would place the public methods before any other package-private 
/ private access methods.


> Remove Mesos dynamic class loading
> --
>
> Key: FLINK-5508
> URL: https://issues.apache.org/jira/browse/FLINK-5508
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> Mesos uses dynamic class loading in order to load the 
> {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be 
> replaced by a compile time dependency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833952#comment-15833952
 ] 

ASF GitHub Bot commented on FLINK-5508:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97255788
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
---
@@ -376,15 +376,19 @@ public static ZooKeeperCheckpointIDCounter 
createCheckpointIDCounter(
}
}
 
-   private static String generateZookeeperPath(String root, String 
namespace) {
+   public static String generateZookeeperPath(String root, String 
namespace) {
if (!namespace.startsWith("/")) {
-   namespace = "/" + namespace;
+   namespace = '/' + namespace;
}
 
if (namespace.endsWith("/")) {
namespace = namespace.substring(0, namespace.length() - 
1);
}
 
+   if (root.endsWith("/")) {
--- End diff --

Nice catch ..


> Remove Mesos dynamic class loading
> --
>
> Key: FLINK-5508
> URL: https://issues.apache.org/jira/browse/FLINK-5508
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> Mesos uses dynamic class loading in order to load the 
> {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be 
> replaced by a compile time dependency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833948#comment-15833948
 ] 

ASF GitHub Bot commented on FLINK-5508:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97255571
  
--- Diff: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 ---
@@ -73,13 +74,16 @@
 /**
  * General tests for the Mesos resource manager component.
  */
-public class MesosFlinkResourceManagerTest {
+public class MesosFlinkResourceManagerTest extends TestLogger {
 
private static final Logger LOG = 
LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class);
 
private static ActorSystem system;
 
-   private static Configuration config = new Configuration() {{
+   private static Configuration config = new Configuration() {
+   private static final long serialVersionUID = 
-952579203067648838L;
+
+   {
--- End diff --

The indentation in this static block seems to be disordered (the following 
2 `setInteger`s should be indented with one more tab).


> Remove Mesos dynamic class loading
> --
>
> Key: FLINK-5508
> URL: https://issues.apache.org/jira/browse/FLINK-5508
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> Mesos uses dynamic class loading in order to load the 
> {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be 
> replaced by a compile time dependency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833951#comment-15833951
 ] 

ASF GitHub Bot commented on FLINK-5508:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97257878
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ * Creates ZooKeeper utility classes without exposing the {@link 
CuratorFramework} dependency. The
+ * curator framework is cached in this instance and shared among all 
created ZooKeeper utility
+ * instances. This requires that the utility classes DO NOT close the 
provided curator framework.
+ *
+ * The curator framework is closed by calling the {@link 
#close(boolean)} method.
+ */
+public class ZooKeeperUtilityFactory {
+
+   private final CuratorFramework root;
+
+   // Facade bound to the provided path
+   private final CuratorFramework facade;
+
+   public ZooKeeperUtilityFactory(Configuration configuration, String 
path) throws Exception {
+   root = ZooKeeperUtils.startCuratorFramework(configuration);
+
+   
root.newNamespaceAwareEnsurePath(path).ensure(root.getZookeeperClient());
+   facade = 
root.usingNamespace(ZooKeeperUtils.generateZookeeperPath(root.getNamespace(), 
path));
+   }
+
+   /**
+* Closes the ZooKeeperUtilityFactory. This entails closing the cached 
{@link CuratorFramework}
+* instance. If cleanup is true, then the initial path and all its 
children are deleted.
+*
+* @param cleanup deletes the initial path and all of its children to 
clean up
+* @throws Exception when deleting the znodes
+*/
+   public void close(boolean cleanup) throws Exception {
+   if (cleanup) {
+   facade.delete().deletingChildrenIfNeeded().forPath("/");
+   }
+
+   root.close();
+   }
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore} instance with the 
provided arguments.
+*
+* @param zkStateHandleStorePath specifying the path in ZooKeeper to 
store the state handles to
+* @param stateStorageHelper storing the actual state data
+* @param executor to run asynchronous callbacks of the state handle 
store
+* @param  Type of the state to be stored
+* @return a ZooKeeperStateHandleStore instance
+* @throws Exception if ZooKeeper could not create the provided state 
handle store path in
+* ZooKeeper
--- End diff --

nit: Does this line in the Javadoc need to be split into 2 lines? I don't 
think it's exceeding the max character limit, is it?


> Remove Mesos dynamic class loading
> --
>
> Key: FLINK-5508
> URL: https://issues.apache.org/jira/browse/FLINK-5508
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> Mesos uses dynamic class loading in order to load the 
> {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be 
> replaced by a compile time dependency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833950#comment-15833950
 ] 

ASF GitHub Bot commented on FLINK-5508:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97257309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ * Creates ZooKeeper utility classes without exposing the {@link 
CuratorFramework} dependency. The
+ * curator framework is cached in this instance and shared among all 
created ZooKeeper utility
+ * instances. This requires that the utility classes DO NOT close the 
provided curator framework.
+ *
+ * The curator framework is closed by calling the {@link 
#close(boolean)} method.
+ */
+public class ZooKeeperUtilityFactory {
+
+   private final CuratorFramework root;
+
+   // Facade bound to the provided path
+   private final CuratorFramework facade;
+
+   public ZooKeeperUtilityFactory(Configuration configuration, String 
path) throws Exception {
--- End diff --

Should perform precondition checks for arguments.


> Remove Mesos dynamic class loading
> --
>
> Key: FLINK-5508
> URL: https://issues.apache.org/jira/browse/FLINK-5508
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> Mesos uses dynamic class loading in order to load the 
> {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be 
> replaced by a compile time dependency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...

2017-01-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97253388
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -394,6 +395,14 @@ protected int runPrivileged(Configuration config, 
Configuration dynamicPropertie
}
}
 
+   if (mesosServices != null) {
+   try {
+   mesosServices.close(false);
+   } catch (Throwable tt) {
+   LOG.error("Error closing the 
ZooKeeperUtilityFactory.", tt);
--- End diff --

This error message is only relevant to `ZooKeeperMesosServices`, and not 
`StandaloneMesosServices.`
Should we just use "Failed to clean up and close MesosServices." here, and 
rely on the stack trace in logs to find out the case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...

2017-01-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97257309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ * Creates ZooKeeper utility classes without exposing the {@link 
CuratorFramework} dependency. The
+ * curator framework is cached in this instance and shared among all 
created ZooKeeper utility
+ * instances. This requires that the utility classes DO NOT close the 
provided curator framework.
+ *
+ * The curator framework is closed by calling the {@link 
#close(boolean)} method.
+ */
+public class ZooKeeperUtilityFactory {
+
+   private final CuratorFramework root;
+
+   // Facade bound to the provided path
+   private final CuratorFramework facade;
+
+   public ZooKeeperUtilityFactory(Configuration configuration, String 
path) throws Exception {
--- End diff --

Should perform precondition checks for arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...

2017-01-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97257878
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ * Creates ZooKeeper utility classes without exposing the {@link 
CuratorFramework} dependency. The
+ * curator framework is cached in this instance and shared among all 
created ZooKeeper utility
+ * instances. This requires that the utility classes DO NOT close the 
provided curator framework.
+ *
+ * The curator framework is closed by calling the {@link 
#close(boolean)} method.
+ */
+public class ZooKeeperUtilityFactory {
+
+   private final CuratorFramework root;
+
+   // Facade bound to the provided path
+   private final CuratorFramework facade;
+
+   public ZooKeeperUtilityFactory(Configuration configuration, String 
path) throws Exception {
+   root = ZooKeeperUtils.startCuratorFramework(configuration);
+
+   
root.newNamespaceAwareEnsurePath(path).ensure(root.getZookeeperClient());
+   facade = 
root.usingNamespace(ZooKeeperUtils.generateZookeeperPath(root.getNamespace(), 
path));
+   }
+
+   /**
+* Closes the ZooKeeperUtilityFactory. This entails closing the cached 
{@link CuratorFramework}
+* instance. If cleanup is true, then the initial path and all its 
children are deleted.
+*
+* @param cleanup deletes the initial path and all of its children to 
clean up
+* @throws Exception when deleting the znodes
+*/
+   public void close(boolean cleanup) throws Exception {
+   if (cleanup) {
+   facade.delete().deletingChildrenIfNeeded().forPath("/");
+   }
+
+   root.close();
+   }
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore} instance with the 
provided arguments.
+*
+* @param zkStateHandleStorePath specifying the path in ZooKeeper to 
store the state handles to
+* @param stateStorageHelper storing the actual state data
+* @param executor to run asynchronous callbacks of the state handle 
store
+* @param  Type of the state to be stored
+* @return a ZooKeeperStateHandleStore instance
+* @throws Exception if ZooKeeper could not create the provided state 
handle store path in
+* ZooKeeper
--- End diff --

nit: Does this line in the Javadoc need to be split into 2 lines? I don't 
think it's exceeding the max character limit, is it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...

2017-01-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97256203
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Wrapper class for a {@link VersionedValue} so that we don't expose a 
curator dependency in our
+ * internal APIs. Such an exposure is problematic due to the relocation of 
curator.
+ */
+public class ZooKeeperVersionedValue {
+
+   private final VersionedValue versionedValue;
+
+   public ZooKeeperVersionedValue(VersionedValue versionedValue) {
+   this.versionedValue = 
Preconditions.checkNotNull(versionedValue);
+   }
+
+   VersionedValue getVersionedValue() {
--- End diff --

nitpick: I would place the public methods before any other package-private 
/ private access methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...

2017-01-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97255788
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
---
@@ -376,15 +376,19 @@ public static ZooKeeperCheckpointIDCounter 
createCheckpointIDCounter(
}
}
 
-   private static String generateZookeeperPath(String root, String 
namespace) {
+   public static String generateZookeeperPath(String root, String 
namespace) {
if (!namespace.startsWith("/")) {
-   namespace = "/" + namespace;
+   namespace = '/' + namespace;
}
 
if (namespace.endsWith("/")) {
namespace = namespace.substring(0, namespace.length() - 
1);
}
 
+   if (root.endsWith("/")) {
--- End diff --

Nice catch ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...

2017-01-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3157#discussion_r97255571
  
--- Diff: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 ---
@@ -73,13 +74,16 @@
 /**
  * General tests for the Mesos resource manager component.
  */
-public class MesosFlinkResourceManagerTest {
+public class MesosFlinkResourceManagerTest extends TestLogger {
 
private static final Logger LOG = 
LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class);
 
private static ActorSystem system;
 
-   private static Configuration config = new Configuration() {{
+   private static Configuration config = new Configuration() {
+   private static final long serialVersionUID = 
-952579203067648838L;
+
+   {
--- End diff --

The indentation in this static block seems to be disordered (the following 
2 `setInteger`s should be indented with one more tab).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5495) ZooKeeperMesosWorkerStore cannot be instantiated

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833895#comment-15833895
 ] 

ASF GitHub Bot commented on FLINK-5495:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3155
  
+1 to merge, LGTM.


> ZooKeeperMesosWorkerStore cannot be instantiated
> 
>
> Key: FLINK-5495
> URL: https://issues.apache.org/jira/browse/FLINK-5495
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>
> The {{ZooKeeperMesosWorkerStore}} cannot be instantiated because it 
> dynamically instantiates a {{ZooKeeperStateHandleStore}} without providing an 
> {{Executor}} to the constructor. This effectively breaks Mesos HA mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3155: [FLINK-5495] [mesos] Provide executor to ZooKeeperMesosWo...

2017-01-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3155
  
+1 to merge, LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3186: [FLINK-5582] [streaming] Add a general distributiv...

2017-01-22 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97246135
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

TableAPI UDAGG will be eventually translated to this windowStream API. The 
accumulate and retract will be handled in this add function. I think it is OK 
if we "view retractions as adding negative values".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833830#comment-15833830
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97246135
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

TableAPI UDAGG will be eventually translated to this windowStream API. The 
accumulate and retract will be handled in this add function. I think it is OK 
if we "view retractions as adding negative values".


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to 

[jira] [Assigned] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts

2017-01-22 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-5546:


Assignee: shijinkui

> When multiple users run test, /tmp/cacheFile conflicts
> --
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts

2017-01-22 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5546:
-
Assignee: (was: shijinkui)

> When multiple users run test, /tmp/cacheFile conflicts
> --
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts

2017-01-22 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-5546:


Assignee: shijinkui

> When multiple users run test, /tmp/cacheFile conflicts
> --
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5582.
-
Resolution: Implemented

Implemented in 09380e49256bff924734b9a932808e0f4daa7e5c

> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction extends Function {
>   ACC createAccumulator();
>   void add(IN value, ACC accumulator);
>   OUT getResult(ACC accumulator);
>   ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5582.
---

> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction extends Function {
>   ACC createAccumulator();
>   void add(IN value, ACC accumulator);
>   OUT getResult(ACC accumulator);
>   ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5590) Create a proper internal state hierarchy

2017-01-22 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5590.
---

> Create a proper internal state hierarchy
> 
>
> Key: FLINK-5590
> URL: https://issues.apache.org/jira/browse/FLINK-5590
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Currently, the state interfaces (like {{ListState}}, {{ValueState}}, 
> {{ReducingState}}) are very sparse and contain only methods exposed to the 
> users. That makes sense to keep the public stable API minimal
> At the same time, the runtime needs more methods for its internal interaction 
> with state, such as:
>   - setting namespaces
>   - accessing raw values
>   - merging namespaces
> These are currently realized by re-creating or re-obtaining the state objects 
> from the KeyedStateBackend. That method causes quite an overhead for each 
> access to the state
> The KeyedStateBackend tries to do some tricks to reduce that overhead, but 
> does it only partially and induces other overhead in the course.
> The root cause of all these issues is a problem in the design: There is no 
> proper "internal state abstraction" in a similar way as there is an external 
> state abstraction (the public state API).
> We should add a similar hierarchy of states for the internal methods. It 
> would look like in the example below:
> {code}
>  * State
>  *   |
>  *   +---InternalKvState
>  *   | |
>  *  MergingState   |
>  *   | |
>  *   +-InternalMergingState
>  *   | |
>  *  ++--+  |
>  *  |   |  |
>  * ReducingStateListState+-+-+
>  *  |   ||   |
>  *  +---+   +---   -InternalListState
>  *  ||
>  *  +-InternalReducingState
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5590) Create a proper internal state hierarchy

2017-01-22 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5590.
-
Resolution: Fixed

Fixed in 3b97128f05bacfb80afe4a2a49741c31ff306cd2

> Create a proper internal state hierarchy
> 
>
> Key: FLINK-5590
> URL: https://issues.apache.org/jira/browse/FLINK-5590
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Currently, the state interfaces (like {{ListState}}, {{ValueState}}, 
> {{ReducingState}}) are very sparse and contain only methods exposed to the 
> users. That makes sense to keep the public stable API minimal
> At the same time, the runtime needs more methods for its internal interaction 
> with state, such as:
>   - setting namespaces
>   - accessing raw values
>   - merging namespaces
> These are currently realized by re-creating or re-obtaining the state objects 
> from the KeyedStateBackend. That method causes quite an overhead for each 
> access to the state
> The KeyedStateBackend tries to do some tricks to reduce that overhead, but 
> does it only partially and induces other overhead in the course.
> The root cause of all these issues is a problem in the design: There is no 
> proper "internal state abstraction" in a similar way as there is an external 
> state abstraction (the public state API).
> We should add a similar hierarchy of states for the internal methods. It 
> would look like in the example below:
> {code}
>  * State
>  *   |
>  *   +---InternalKvState
>  *   | |
>  *  MergingState   |
>  *   | |
>  *   +-InternalMergingState
>  *   | |
>  *  ++--+  |
>  *  |   |  |
>  * ReducingStateListState+-+-+
>  *  |   ||   |
>  *  +---+   +---   -InternalListState
>  *  ||
>  *  +-InternalReducingState
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833726#comment-15833726
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3186


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction extends Function {
>   ACC createAccumulator();
>   void add(IN value, ACC accumulator);
>   OUT getResult(ACC accumulator);
>   ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3186: [FLINK-5582] [streaming] Add a general distributiv...

2017-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3186


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833701#comment-15833701
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3186
  
Since this requires constantly extensive merge conflict resolving with the 
master, I want to merge this soon.

@shaoxuan-wang has tested it life and the CI pass as well...


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction extends Function {
>   ACC createAccumulator();
>   void add(IN value, ACC accumulator);
>   OUT getResult(ACC accumulator);
>   ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3186: [FLINK-5582] [streaming] Add a general distributive aggre...

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3186
  
Since this requires constantly extensive merge conflict resolving with the 
master, I want to merge this soon.

@shaoxuan-wang has tested it life and the CI pass as well...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5608) Cancel button not always visible

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833696#comment-15833696
 ] 

ASF GitHub Bot commented on FLINK-5608:
---

GitHub user rehevkor5 opened a pull request:

https://github.com/apache/flink/pull/3189

[FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows

- Most importantly, the Cancel button has been changed to float
right, and will only wrap downward if pushed out by the job name
- Also combined the job name and job id into a single horizontal
element, reducing the overall horizontal space taken by the main
navbar components in the job view, making the main navbar components
less likely to wrap downward and be overlapped by the secondary navbar.
- Compiled code has been rebuilt

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rehevkor5/flink 
ui_job_cancel_button_layout_issue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3189


commit 33e3a14ebd3d614a6e5d4eae90f5985416e0b995
Author: Shannon Carey 
Date:   2017-01-22T20:52:43Z

[FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows

- Most importantly, the Cancel button has been changed to float
right, and will only wrap downward if pushed out by the job name
- Also combined the job name and job id into a single horizontal
element, reducing the overall horizontal space taken by the main
navbar components in the job view, making the main navbar components
less likely to wrap downward and be overlapped by the secondary navbar.
- Compiled code has been rebuilt




> Cancel button not always visible
> 
>
> Key: FLINK-5608
> URL: https://issues.apache.org/jira/browse/FLINK-5608
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.1.4
>Reporter: Shannon Carey
>Assignee: Shannon Carey
>Priority: Minor
>
> When the window is not wide enough, or when the job name is too long, the 
> "Cancel" button in the Job view of the web UI is not visible because it is 
> the first element that gets wrapped down and gets covered by the secondary 
> navbar (the tabs). This causes us to often need to resize the browser wider 
> than our monitor in order to use the cancel button.
> In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the 
> content may wrap, especially if the content's horizontal width if not known & 
> fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any 
> unexpected change in height will result in overlap with the rest of the 
> normal-flow content in the page. The Bootstrap docs explain this in their 
> "Overflowing content" callout.
> I am submitting a PR which does not attempt to resolve all issues with the 
> fixed navbar approach, but attempts to improve the situation by using less 
> horizontal space and by altering the layout approach of the Cancel button.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3189: [FLINK-5608] [webfrontend] Cancel button stays vis...

2017-01-22 Thread rehevkor5
GitHub user rehevkor5 opened a pull request:

https://github.com/apache/flink/pull/3189

[FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows

- Most importantly, the Cancel button has been changed to float
right, and will only wrap downward if pushed out by the job name
- Also combined the job name and job id into a single horizontal
element, reducing the overall horizontal space taken by the main
navbar components in the job view, making the main navbar components
less likely to wrap downward and be overlapped by the secondary navbar.
- Compiled code has been rebuilt

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rehevkor5/flink 
ui_job_cancel_button_layout_issue

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3189


commit 33e3a14ebd3d614a6e5d4eae90f5985416e0b995
Author: Shannon Carey 
Date:   2017-01-22T20:52:43Z

[FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows

- Most importantly, the Cancel button has been changed to float
right, and will only wrap downward if pushed out by the job name
- Also combined the job name and job id into a single horizontal
element, reducing the overall horizontal space taken by the main
navbar components in the job view, making the main navbar components
less likely to wrap downward and be overlapped by the secondary navbar.
- Compiled code has been rebuilt




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5608) Cancel button not always visible

2017-01-22 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5608:


 Summary: Cancel button not always visible
 Key: FLINK-5608
 URL: https://issues.apache.org/jira/browse/FLINK-5608
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.1.4
Reporter: Shannon Carey
Assignee: Shannon Carey
Priority: Minor


When the window is not wide enough, or when the job name is too long, the 
"Cancel" button in the Job view of the web UI is not visible because it is the 
first element that gets wrapped down and gets covered by the secondary navbar 
(the tabs). This causes us to often need to resize the browser wider than our 
monitor in order to use the cancel button.

In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the 
content may wrap, especially if the content's horizontal width if not known & 
fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any 
unexpected change in height will result in overlap with the rest of the 
normal-flow content in the page. The Bootstrap docs explain this in their 
"Overflowing content" callout.

I am submitting a PR which does not attempt to resolve all issues with the 
fixed navbar approach, but attempts to improve the situation by using less 
horizontal space and by altering the layout approach of the Cancel button.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833675#comment-15833675
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3186
  
@shaoxuan-wang I cannot reproduce the compile error you posted.
The latest commit also gets a green light from Travis CI: 
https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction extends Function {
>   ACC createAccumulator();
>   void add(IN value, ACC accumulator);
>   OUT getResult(ACC accumulator);
>   ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3186: [FLINK-5582] [streaming] Add a general distributive aggre...

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3186
  
@shaoxuan-wang I cannot reproduce the compile error you posted.
The latest commit also gets a green light from Travis CI: 
https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5454) Add Documentation about how to tune Checkpointing for large state

2017-01-22 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5454.
-
Resolution: Fixed

Fixed in
  - 1.2.0 via 099cdd01013c6959d3785088204601b926c04193
  - 1.3.0 via 392b2e9a0595a947a24e1504cbfd5f9cabc1c86a

> Add Documentation about how to tune Checkpointing for large state
> -
>
> Key: FLINK-5454
> URL: https://issues.apache.org/jira/browse/FLINK-5454
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5454) Add Documentation about how to tune Checkpointing for large state

2017-01-22 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5454.
---

> Add Documentation about how to tune Checkpointing for large state
> -
>
> Key: FLINK-5454
> URL: https://issues.apache.org/jira/browse/FLINK-5454
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833605#comment-15833605
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97227451
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

My first feeling is to keep the name `add()` because it fits better 
together with the term `Accumulator`. One can view retractions as adding 
negative values. What do you think about that?


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and 

[GitHub] flink pull request #3186: [FLINK-5582] [streaming] Add a general distributiv...

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97227451
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

My first feeling is to keep the name `add()` because it fits better 
together with the term `Accumulator`. One can view retractions as adding 
negative values. What do you think about that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5562) Driver fixes

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833603#comment-15833603
 ] 

ASF GitHub Bot commented on FLINK-5562:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3188
  
Looks good. I would like to merge this tomorrow together with #3187 (which 
is time critical for the release).


> Driver fixes
> 
>
> Key: FLINK-5562
> URL: https://issues.apache.org/jira/browse/FLINK-5562
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Improve parametrization and output formatting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5562) Driver fixes

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833601#comment-15833601
 ] 

ASF GitHub Bot commented on FLINK-5562:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3187
  
Changes look good to me.
As per Robert's email on the mailing list, he is shooting for a new release 
candidate very soon, so we would need to merge this before tomorrow, if 
possible.

Will merge this tomorrow morning unless someone raises an objection till 
then...


> Driver fixes
> 
>
> Key: FLINK-5562
> URL: https://issues.apache.org/jira/browse/FLINK-5562
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Improve parametrization and output formatting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue

2017-01-22 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5376:
--
Description: 
The following are two examples where ordered stream element queue is mentioned:
{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}

I guess OrderedStreamElementQueue was coded first.

  was:
The following are two examples where ordered stream element queue is mentioned:
{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}
I guess OrderedStreamElementQueue was coded first.


> Misleading log statements in UnorderedStreamElementQueue
> 
>
> Key: FLINK-5376
> URL: https://issues.apache.org/jira/browse/FLINK-5376
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> The following are two examples where ordered stream element queue is 
> mentioned:
> {code}
> LOG.debug("Put element into ordered stream element queue. New filling 
> degree " +
>   "({}/{}).", numberEntries, capacity);
> return true;
>   } else {
> LOG.debug("Failed to put element into ordered stream element queue 
> because it " +
> {code}
> I guess OrderedStreamElementQueue was coded first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3188: [FLINK-5562] [gelly] Driver fixes

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3188
  
Looks good. I would like to merge this tomorrow together with #3187 (which 
is time critical for the release).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3187: [FLINK-5562] [gelly] Driver fixes for release-1.2

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3187
  
Changes look good to me.
As per Robert's email on the mailing list, he is shooting for a new release 
candidate very soon, so we would need to merge this before tomorrow, if 
possible.

Will merge this tomorrow morning unless someone raises an objection till 
then...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2017-01-22 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4848:
--
Description: 
{code}
  String keystoreFilePath = sslConfig.getString(
ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
...
  try {
keyStoreFile = new FileInputStream(new File(keystoreFilePath));
{code}

If keystoreFilePath is null, the File ctor would throw NPE.

  was:
{code}
  String keystoreFilePath = sslConfig.getString(
ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
...
  try {
keyStoreFile = new FileInputStream(new File(keystoreFilePath));
{code}
If keystoreFilePath is null, the File ctor would throw NPE.


> keystoreFilePath should be checked against null in 
> SSLUtils#createSSLServerContext
> --
>
> Key: FLINK-4848
> URL: https://issues.apache.org/jira/browse/FLINK-4848
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833598#comment-15833598
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97227303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

Retractions are specific to the Table API. Do you expect this same 
interface to be used for user-defined aggregations there as well?


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - 

[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97227303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

Retractions are specific to the Table API. Do you expect this same 
interface to be used for user-defined aggregations there as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833589#comment-15833589
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97227249
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
+
+   OUT getResult(ACC accumulator);
+
+   ACC merge(ACC a, ACC b);
--- End diff --

This could be done, true. It would currently mean a slight overhead (for 
list creation), but that is probably okay.

I would like to address that change in a separate pull request: We should 
probably adjust the merging state implementation as well, to exploit that new 
signature.


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but 

[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97227249
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
+
+   OUT getResult(ACC accumulator);
+
+   ACC merge(ACC a, ACC b);
--- End diff --

This could be done, true. It would currently mean a slight overhead (for 
list creation), but that is probably okay.

I would like to address that change in a separate pull request: We should 
probably adjust the merging state implementation as well, to exploit that new 
signature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833581#comment-15833581
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3186
  
Update: The first version had an issue with binary compatibility in the 
Scala DataStream API:

This Scala API accidentally exposed in Flink 1.0 a public method with an 
internal type as a parameter (an internal utility method). That should not have 
been the case, since method cannot be guaranteed when a parameter is a 
non-public type.

Unfortunately, the automatic tooling for binary compatibility checks is not 
flexible enough to work around that: Exclusions  for methods do not work if 
parameter types were altered.

Due to that, the newer version rolls back the cleanup commit that renames 
the internal `AggregateFunction`.


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction extends Function {
>   ACC createAccumulator();
>   void add(IN value, ACC accumulator);
>   OUT getResult(ACC accumulator);
>   ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
> long count;
> long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Integer value, AverageAccumulator acc) {
> acc.sum += value;
> acc.count++;
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> {
> public AverageAccumulator createAccumulator() {
> return new AverageAccumulator();
> }
> public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
> a.count += b.count;
> a.sum += b.sum;
> return a;
> }
> public void add(Datum value, AverageAccumulator acc) {
> acc.count += value.getWeight();
> acc.sum += value.getValue();
> }
> public Double getResult(AverageAccumulator acc) {
> return acc.sum / (double) acc.count;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3186: [FLINK-5582] [streaming] dd a general distributive aggreg...

2017-01-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3186
  
Update: The first version had an issue with binary compatibility in the 
Scala DataStream API:

This Scala API accidentally exposed in Flink 1.0 a public method with an 
internal type as a parameter (an internal utility method). That should not have 
been the case, since method cannot be guaranteed when a parameter is a 
non-public type.

Unfortunately, the automatic tooling for binary compatibility checks is not 
flexible enough to work around that: Exclusions  for methods do not work if 
parameter types were altered.

Due to that, the newer version rolls back the cleanup commit that renames 
the internal `AggregateFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833577#comment-15833577
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97226958
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
+
+   OUT getResult(ACC accumulator);
+
+   ACC merge(ACC a, ACC b);
--- End diff --

Do you think it is useful to extend merge function to accept a list of ACC: 
 ACC merge(List a). There are cases where the group merging a list of 
instances is much more efficient than just merge only two instances. 


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical 

[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...

2017-01-22 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97226958
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
+
+   OUT getResult(ACC accumulator);
+
+   ACC merge(ACC a, ACC b);
--- End diff --

Do you think it is useful to extend merge function to accept a list of ACC: 
 ACC merge(List a). There are cases where the group merging a list of 
instances is much more efficient than just merge only two instances. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833576#comment-15833576
 ] 

ASF GitHub Bot commented on FLINK-5582:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97226914
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

As proposed in https://goo.gl/00ea5j, this function will not only handle 
the accumulate, but also handle the retract. Instead of "add", can you please 
consider to use "update". 


> Add a general distributive aggregate function
> -
>
> Key: FLINK-5582
> URL: https://issues.apache.org/jira/browse/FLINK-5582
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful 

[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...

2017-01-22 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3186#discussion_r97226914
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * Aggregation functions must be {@link Serializable} because they are 
sent around
+ * between distributed processes during distributed execution.
+ * 
+ * An example how to use this interface is below:
+ * 
+ * {@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction {
+ * 
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ * 
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ * 
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ * 
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 
'average'
+ * public class WeightedAverage implements AggregateFunction {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }
+ */
+public interface AggregateFunction extends Function, 
Serializable {
+
+   ACC createAccumulator();
+
+   void add(IN value, ACC accumulator);
--- End diff --

As proposed in https://goo.gl/00ea5j, this function will not only handle 
the accumulate, but also handle the retract. Instead of "add", can you please 
consider to use "update". 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5576) extend deserialization functions of KvStateRequestSerializer to detect unconsumed bytes

2017-01-22 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5576.
--
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

Fixed in 7a0e3d6 (release-1.2), 21742b2 (master).

> extend deserialization functions of KvStateRequestSerializer to detect 
> unconsumed bytes
> ---
>
> Key: FLINK-5576
> URL: https://issues.apache.org/jira/browse/FLINK-5576
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> KvStateRequestSerializer#deserializeValue and 
> KvStateRequestSerializer#deserializeList both deserialize a given byte array. 
> This is used by clients and unit tests and it is fair to assume that these 
> byte arrays represent a complete value since we do not offer a method to 
> continue reading from the middle of the array anyway. Therefore, we can treat 
> unconsumed bytes as errors, e.g. from a wrong serializer being used, and 
> throw a IOException with an appropriate failure message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5559) queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException without own failure message if deserialisation fails

2017-01-22 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5559.
--
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

Fixed in ef13f48 fd63981 f592d4c (release-1.2), c1c6ef1 3fe2cf5 563c3a4 
(master).


> queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws 
> an IOException without own failure message if deserialisation fails
> -
>
> Key: FLINK-5559
> URL: https://issues.apache.org/jira/browse/FLINK-5559
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException, 
> e.g. EOFException, if the deserialisation fails, e.g. there are not enough 
> available bytes.
> In these cases, it should instead also throw an IllegalArgumentException with 
> a message containing "This indicates a mismatch in the key/namespace 
> serializers used by the KvState instance and this access." as the other error 
> cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5576) extend deserialization functions of KvStateRequestSerializer to detect unconsumed bytes

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833539#comment-15833539
 ] 

ASF GitHub Bot commented on FLINK-5576:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3174


> extend deserialization functions of KvStateRequestSerializer to detect 
> unconsumed bytes
> ---
>
> Key: FLINK-5576
> URL: https://issues.apache.org/jira/browse/FLINK-5576
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> KvStateRequestSerializer#deserializeValue and 
> KvStateRequestSerializer#deserializeList both deserialize a given byte array. 
> This is used by clients and unit tests and it is fair to assume that these 
> byte arrays represent a complete value since we do not offer a method to 
> continue reading from the middle of the array anyway. Therefore, we can treat 
> unconsumed bytes as errors, e.g. from a wrong serializer being used, and 
> throw a IOException with an appropriate failure message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5528) tests: reduce the retry delay in QueryableStateITCase

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833538#comment-15833538
 ] 

ASF GitHub Bot commented on FLINK-5528:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3139


> tests: reduce the retry delay in QueryableStateITCase
> -
>
> Key: FLINK-5528
> URL: https://issues.apache.org/jira/browse/FLINK-5528
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.2.0, 1.3.0
>
>
> The QueryableStateITCase uses a retry of 1 second, e.g. if a queried key does 
> not exist yet. This seems a bit too conservative as the job may not take that 
> long to deploy and especially since getKvStateWithRetries() recovers from 
> failures by retrying.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5559) queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException without own failure message if deserialisation fails

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833540#comment-15833540
 ] 

ASF GitHub Bot commented on FLINK-5559:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3172


> queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws 
> an IOException without own failure message if deserialisation fails
> -
>
> Key: FLINK-5559
> URL: https://issues.apache.org/jira/browse/FLINK-5559
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>
> KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException, 
> e.g. EOFException, if the deserialisation fails, e.g. there are not enough 
> available bytes.
> In these cases, it should instead also throw an IllegalArgumentException with 
> a message containing "This indicates a mismatch in the key/namespace 
> serializers used by the KvState instance and this access." as the other error 
> cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3172: [FLINK-5559] let KvStateRequestSerializer#deserial...

2017-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3172


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3174: [FLINK-5576] extend deserialization functions of K...

2017-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3174


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3139: [FLINK-5528][query][tests] reduce the retry delay ...

2017-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3139


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5607) Move location lookup retry out of KvStateLocationLookupService

2017-01-22 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5607:
--

 Summary: Move location lookup retry out of 
KvStateLocationLookupService
 Key: FLINK-5607
 URL: https://issues.apache.org/jira/browse/FLINK-5607
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi


If a state location lookup fails because of an {{UnknownJobManager}}, the 
lookup service will automagically retry this. I think it's better to move this 
out of the lookup service and the retry be handled out side by the caller.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5606) Remove magic number in key and namespace serialization

2017-01-22 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5606:
--

 Summary: Remove magic number in key and namespace serialization
 Key: FLINK-5606
 URL: https://issues.apache.org/jira/browse/FLINK-5606
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
Priority: Minor


The serialized key and namespace for state queries contains a magic number 
between the key and namespace: {{key|42|namespace}}. This was for historical 
reasons in order to skip deserialization of the key and namespace for our old 
{{RocksDBStateBackend}} which used the same format. This has now been 
superseded by the keygroup aware state backends and there is no point in doing 
this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5605) Make KvStateRequestSerializer package private

2017-01-22 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reassigned FLINK-5605:
--

Assignee: Ufuk Celebi

> Make KvStateRequestSerializer package private 
> --
>
> Key: FLINK-5605
> URL: https://issues.apache.org/jira/browse/FLINK-5605
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Minor
>
> From early users I've seen that many people use the KvStateRequestSerializer 
> in their programs. This was actually meant as an internal package to be used 
> by the client and server for internal message serialization.
> I vote to make this package private and create an explicit 
> {{QueryableStateClientUtil}} for user serialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5605) Make KvStateRequestSerializer package private

2017-01-22 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5605:
--

 Summary: Make KvStateRequestSerializer package private 
 Key: FLINK-5605
 URL: https://issues.apache.org/jira/browse/FLINK-5605
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Reporter: Ufuk Celebi
Priority: Minor


>From early users I've seen that many people use the KvStateRequestSerializer 
>in their programs. This was actually meant as an internal package to be used 
>by the client and server for internal message serialization.

I vote to make this package private and create an explicit 
{{QueryableStateClientUtil}} for user serialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5604) Replace QueryableStateClient constructor

2017-01-22 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5604:
--

 Summary: Replace QueryableStateClient constructor
 Key: FLINK-5604
 URL: https://issues.apache.org/jira/browse/FLINK-5604
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi


The {{QueryableStateClient}} constructor expects a configuration object which 
makes it very hard for users to see what's expected to be there and what not.

I propose to split this constructor up and add some static helper for the 
common cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5509) Replace QueryableStateClient keyHashCode argument

2017-01-22 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reassigned FLINK-5509:
--

Assignee: Ufuk Celebi

> Replace QueryableStateClient keyHashCode argument
> -
>
> Key: FLINK-5509
> URL: https://issues.apache.org/jira/browse/FLINK-5509
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Minor
>
> When going over the low level QueryableStateClient with [~NicoK] we noticed 
> that the key hashCode argument can be confusing to users:
> {code}
> Future getKvState(
>   JobID jobId,
>   String name,
>   int keyHashCode,
>   byte[] serializedKeyAndNamespace)
> {code}
> The {{keyHashCode}} argument is the result of calling {{hashCode()}} on the 
> key to look up. This is what is send to the JobManager in order to look up 
> the location of the key. While pretty straight forward, it is repetitive and 
> possibly confusing.
> As an alternative we suggest to make the method generic and simply call 
> hashCode on the object ourselves. This way the user just provides the key 
> object.
> Since there are some early users of the queryable state API already, we would 
> suggest to rename the method in order to provoke a compilation error after 
> upgrading to the actually released 1.2 version.
> (This would also work without renaming since the hashCode of Integer (what 
> users currently provide) is the same number, but it would be confusing why it 
> acutally works.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5603) Use Flink's futures in QueryableStateClient

2017-01-22 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5603:
--

 Summary: Use Flink's futures in QueryableStateClient
 Key: FLINK-5603
 URL: https://issues.apache.org/jira/browse/FLINK-5603
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
Priority: Minor


The current {{QueryableStateClient}} exposes Scala's Futures as the return type 
for queries. Since we are trying to get away from hard Scala dependencies in 
the current master, we should proactively replace this with Flink's Future 
interface.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4220) ClientTest fails after port conflict

2017-01-22 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4220.
--
Resolution: Cannot Reproduce

> ClientTest fails after port conflict
> 
>
> Key: FLINK-4220
> URL: https://issues.apache.org/jira/browse/FLINK-4220
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Ufuk Celebi
>  Labels: test-stability
>
> {code}
> Running org.apache.flink.client.program.ClientTest
> org.jboss.netty.channel.ChannelException: Failed to bind to: /127.0.0.1:42087
>   at 
> org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
>   at 
> akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
>   at 
> akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
>   at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
>   at scala.util.Try$.apply(Try.scala:161)
>   at scala.util.Success.map(Try.scala:206)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>   at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>   at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>   at 
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.BindException: Address already in use
>   at sun.nio.ch.Net.bind0(Native Method)
>   at sun.nio.ch.Net.bind(Net.java:444)
>   at sun.nio.ch.Net.bind(Net.java:436)
>   at 
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
>   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
>   at 
> org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
>   at 
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
>   at 
> org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> ...
> Failed tests: 
>   ClientTest.setUp:107 Setup of test actor system failed.
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/144842320/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5116) Missing RocksDB state backend factory

2017-01-22 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5116.
--
Resolution: Won't Fix

Implemented in >= 1.2

> Missing RocksDB state backend factory
> -
>
> Key: FLINK-5116
> URL: https://issues.apache.org/jira/browse/FLINK-5116
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.3
>Reporter: Ufuk Celebi
>Priority: Critical
>
> The 1.1 series misses a factory for the rocks db backend. This means that 
> 1.1. users cannot configure rocks DB via the config file and are forced to 
> set it via the environment.
> This has been recently added to 1.2-SNAPSHOT. It should be back ported to 
> 1.1.4 
> (https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5436) UDF state without CheckpointedRestoring can result in restarting loop

2017-01-22 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833467#comment-15833467
 ] 

Ufuk Celebi commented on FLINK-5436:


[~stefanrichte...@gmail.com] and [~aljoscha] should we address this and 
FLINK-5437 for the upcoming release or not?

> UDF state without CheckpointedRestoring can result in restarting loop
> -
>
> Key: FLINK-5436
> URL: https://issues.apache.org/jira/browse/FLINK-5436
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Minor
>
> When restoring a job with Checkpointed state and not implementing the new 
> CheckpointedRestoring interface, the job will be restarted over and over 
> again (given the respective restarting strategy).
> Since this is not recoverable, we should immediately fail the job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5530) race condition in AbstractRocksDBState#getSerializedValue

2017-01-22 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5530.
--
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

Fixed in d8222c1 (release-1.2), d16552d (master).

> race condition in AbstractRocksDBState#getSerializedValue
> -
>
> Key: FLINK-5530
> URL: https://issues.apache.org/jira/browse/FLINK-5530
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.2.0, 1.3.0
>
>
> AbstractRocksDBState#getSerializedValue() uses the same key serialisation 
> stream as the ordinary state access methods but is called in parallel during 
> state queries thus violating the assumption of only one thread accessing it. 
> This may lead to either wrong results in queries or corrupt data while 
> queries are executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5530) race condition in AbstractRocksDBState#getSerializedValue

2017-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833448#comment-15833448
 ] 

ASF GitHub Bot commented on FLINK-5530:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3143


> race condition in AbstractRocksDBState#getSerializedValue
> -
>
> Key: FLINK-5530
> URL: https://issues.apache.org/jira/browse/FLINK-5530
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.2.0, 1.3.0
>
>
> AbstractRocksDBState#getSerializedValue() uses the same key serialisation 
> stream as the ordinary state access methods but is called in parallel during 
> state queries thus violating the assumption of only one thread accessing it. 
> This may lead to either wrong results in queries or corrupt data while 
> queries are executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...

2017-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3143


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---