[jira] [Updated] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5546: - Summary: java.io.tmpdir setted as project build directory in surefire plugin (was: When multiple users run test, /tmp/cacheFile conflicts) > java.io.tmpdir setted as project build directory in surefire plugin > --- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Test >Affects Versions: 1.2.0, 1.3.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834014#comment-15834014 ] ASF GitHub Bot commented on FLINK-5546: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3190 [FLINK-5546][build] When multiple users run test, /tmp/cacheFile conf… Unit test will create file in java.io.tmpdir, default it's `/tmp` which has capacity limit. Create temporary in project target directory is reasonable, delete them as `mvn clean` - [X] General - The pull request references the related JIRA issue ("[FLINK-5546] When multiple users run test, /tmp/cacheFile conflicts") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5546-tmp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3190 commit 3ec804a5f1de61f00ac49b1d4ed0ef005123daae Author: shijinkuiDate: 2017-01-23T07:37:44Z [FLINK-5546][build] When multiple users run test, /tmp/cacheFile conflicts > When multiple users run test, /tmp/cacheFile conflicts > -- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Test >Affects Versions: 1.2.0, 1.3.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3190: [FLINK-5546][build] When multiple users run test, ...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3190 [FLINK-5546][build] When multiple users run test, /tmp/cacheFile conf⦠Unit test will create file in java.io.tmpdir, default it's `/tmp` which has capacity limit. Create temporary in project target directory is reasonable, delete them as `mvn clean` - [X] General - The pull request references the related JIRA issue ("[FLINK-5546] When multiple users run test, /tmp/cacheFile conflicts") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5546-tmp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3190 commit 3ec804a5f1de61f00ac49b1d4ed0ef005123daae Author: shijinkuiDate: 2017-01-23T07:37:44Z [FLINK-5546][build] When multiple users run test, /tmp/cacheFile conflicts --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3318) Add support for quantifiers to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834008#comment-15834008 ] ASF GitHub Bot commented on FLINK-3318: --- Github user chermenin commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97264252 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java --- @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType) { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList(); + stateTransitions = new ArrayList<>(); + } + + public State(String name, StateType stateType, Collection stateTransitions) { --- End diff -- It seems that this constructor is never used. What is that for? > Add support for quantifiers to CEP's pattern API > > > Key: FLINK-3318 > URL: https://issues.apache.org/jira/browse/FLINK-3318 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > It would be a good addition to extend the pattern API to support quantifiers > known from regular expressions (e.g. Kleene star, ?, +, or count bounds). > This would considerably enrich the set of supported patterns. > Implementing the count bounds could be done by unrolling the pattern state. > In order to support the Kleene star operator, the {{NFACompiler}} has to be > extended to insert epsilon-transition between a Kleene start state and the > succeeding pattern state. In order to support {{?}}, one could insert two > paths from the preceding state, one which accepts the event and another which > directly goes into the next pattern state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...
Github user chermenin commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97264252 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java --- @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType) { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList(); + stateTransitions = new ArrayList<>(); + } + + public State(String name, StateType stateType, Collection stateTransitions) { --- End diff -- It seems that this constructor is never used. What is that for? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading
[ https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833957#comment-15833957 ] ASF GitHub Bot commented on FLINK-5508: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3158 This is an identical backport / forwardport of the other 3 Mesos fixes, correct? If so, since #3155 and #3156 all have +1s, and #3157 is also a +1 once the minor code style comments are addressed, this is also a +1 from my side. > Remove Mesos dynamic class loading > -- > > Key: FLINK-5508 > URL: https://issues.apache.org/jira/browse/FLINK-5508 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > Mesos uses dynamic class loading in order to load the > {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be > replaced by a compile time dependency. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3158: [backport] [FLINK-5508] [FLINK-5496] [FLINK-5495] Fix Mes...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3158 This is an identical backport / forwardport of the other 3 Mesos fixes, correct? If so, since #3155 and #3156 all have +1s, and #3157 is also a +1 once the minor code style comments are addressed, this is also a +1 from my side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading
[ https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833953#comment-15833953 ] ASF GitHub Bot commented on FLINK-5508: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97253388 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -394,6 +395,14 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie } } + if (mesosServices != null) { + try { + mesosServices.close(false); + } catch (Throwable tt) { + LOG.error("Error closing the ZooKeeperUtilityFactory.", tt); --- End diff -- This error message is only relevant to `ZooKeeperMesosServices`, and not `StandaloneMesosServices.` Should we just use "Failed to clean up and close MesosServices." here, and rely on the stack trace in logs to find out the case? > Remove Mesos dynamic class loading > -- > > Key: FLINK-5508 > URL: https://issues.apache.org/jira/browse/FLINK-5508 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > Mesos uses dynamic class loading in order to load the > {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be > replaced by a compile time dependency. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading
[ https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833949#comment-15833949 ] ASF GitHub Bot commented on FLINK-5508: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97256203 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.util.Preconditions; + +/** + * Wrapper class for a {@link VersionedValue} so that we don't expose a curator dependency in our + * internal APIs. Such an exposure is problematic due to the relocation of curator. + */ +public class ZooKeeperVersionedValue { + + private final VersionedValue versionedValue; + + public ZooKeeperVersionedValue(VersionedValue versionedValue) { + this.versionedValue = Preconditions.checkNotNull(versionedValue); + } + + VersionedValue getVersionedValue() { --- End diff -- nitpick: I would place the public methods before any other package-private / private access methods. > Remove Mesos dynamic class loading > -- > > Key: FLINK-5508 > URL: https://issues.apache.org/jira/browse/FLINK-5508 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > Mesos uses dynamic class loading in order to load the > {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be > replaced by a compile time dependency. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading
[ https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833952#comment-15833952 ] ASF GitHub Bot commented on FLINK-5508: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97255788 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -376,15 +376,19 @@ public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter( } } - private static String generateZookeeperPath(String root, String namespace) { + public static String generateZookeeperPath(String root, String namespace) { if (!namespace.startsWith("/")) { - namespace = "/" + namespace; + namespace = '/' + namespace; } if (namespace.endsWith("/")) { namespace = namespace.substring(0, namespace.length() - 1); } + if (root.endsWith("/")) { --- End diff -- Nice catch .. > Remove Mesos dynamic class loading > -- > > Key: FLINK-5508 > URL: https://issues.apache.org/jira/browse/FLINK-5508 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > Mesos uses dynamic class loading in order to load the > {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be > replaced by a compile time dependency. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading
[ https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833948#comment-15833948 ] ASF GitHub Bot commented on FLINK-5508: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97255571 --- Diff: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java --- @@ -73,13 +74,16 @@ /** * General tests for the Mesos resource manager component. */ -public class MesosFlinkResourceManagerTest { +public class MesosFlinkResourceManagerTest extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class); private static ActorSystem system; - private static Configuration config = new Configuration() {{ + private static Configuration config = new Configuration() { + private static final long serialVersionUID = -952579203067648838L; + + { --- End diff -- The indentation in this static block seems to be disordered (the following 2 `setInteger`s should be indented with one more tab). > Remove Mesos dynamic class loading > -- > > Key: FLINK-5508 > URL: https://issues.apache.org/jira/browse/FLINK-5508 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > Mesos uses dynamic class loading in order to load the > {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be > replaced by a compile time dependency. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading
[ https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833951#comment-15833951 ] ASF GitHub Bot commented on FLINK-5508: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97257878 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +import java.io.Serializable; +import java.util.concurrent.Executor; + +/** + * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The + * curator framework is cached in this instance and shared among all created ZooKeeper utility + * instances. This requires that the utility classes DO NOT close the provided curator framework. + * + * The curator framework is closed by calling the {@link #close(boolean)} method. + */ +public class ZooKeeperUtilityFactory { + + private final CuratorFramework root; + + // Facade bound to the provided path + private final CuratorFramework facade; + + public ZooKeeperUtilityFactory(Configuration configuration, String path) throws Exception { + root = ZooKeeperUtils.startCuratorFramework(configuration); + + root.newNamespaceAwareEnsurePath(path).ensure(root.getZookeeperClient()); + facade = root.usingNamespace(ZooKeeperUtils.generateZookeeperPath(root.getNamespace(), path)); + } + + /** +* Closes the ZooKeeperUtilityFactory. This entails closing the cached {@link CuratorFramework} +* instance. If cleanup is true, then the initial path and all its children are deleted. +* +* @param cleanup deletes the initial path and all of its children to clean up +* @throws Exception when deleting the znodes +*/ + public void close(boolean cleanup) throws Exception { + if (cleanup) { + facade.delete().deletingChildrenIfNeeded().forPath("/"); + } + + root.close(); + } + + /** +* Creates a {@link ZooKeeperStateHandleStore} instance with the provided arguments. +* +* @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to +* @param stateStorageHelper storing the actual state data +* @param executor to run asynchronous callbacks of the state handle store +* @param Type of the state to be stored +* @return a ZooKeeperStateHandleStore instance +* @throws Exception if ZooKeeper could not create the provided state handle store path in +* ZooKeeper --- End diff -- nit: Does this line in the Javadoc need to be split into 2 lines? I don't think it's exceeding the max character limit, is it? > Remove Mesos dynamic class loading > -- > > Key: FLINK-5508 > URL: https://issues.apache.org/jira/browse/FLINK-5508 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > Mesos uses dynamic class loading in order to load the > {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be > replaced by a compile time dependency. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5508) Remove Mesos dynamic class loading
[ https://issues.apache.org/jira/browse/FLINK-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833950#comment-15833950 ] ASF GitHub Bot commented on FLINK-5508: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97257309 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +import java.io.Serializable; +import java.util.concurrent.Executor; + +/** + * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The + * curator framework is cached in this instance and shared among all created ZooKeeper utility + * instances. This requires that the utility classes DO NOT close the provided curator framework. + * + * The curator framework is closed by calling the {@link #close(boolean)} method. + */ +public class ZooKeeperUtilityFactory { + + private final CuratorFramework root; + + // Facade bound to the provided path + private final CuratorFramework facade; + + public ZooKeeperUtilityFactory(Configuration configuration, String path) throws Exception { --- End diff -- Should perform precondition checks for arguments. > Remove Mesos dynamic class loading > -- > > Key: FLINK-5508 > URL: https://issues.apache.org/jira/browse/FLINK-5508 > Project: Flink > Issue Type: Improvement > Components: Mesos >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > Mesos uses dynamic class loading in order to load the > {{ZooKeeperStateHandleStore}} and the {{CuratorFramework}} class. This can be > replaced by a compile time dependency. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97253388 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -394,6 +395,14 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie } } + if (mesosServices != null) { + try { + mesosServices.close(false); + } catch (Throwable tt) { + LOG.error("Error closing the ZooKeeperUtilityFactory.", tt); --- End diff -- This error message is only relevant to `ZooKeeperMesosServices`, and not `StandaloneMesosServices.` Should we just use "Failed to clean up and close MesosServices." here, and rely on the stack trace in logs to find out the case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97257309 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +import java.io.Serializable; +import java.util.concurrent.Executor; + +/** + * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The + * curator framework is cached in this instance and shared among all created ZooKeeper utility + * instances. This requires that the utility classes DO NOT close the provided curator framework. + * + * The curator framework is closed by calling the {@link #close(boolean)} method. + */ +public class ZooKeeperUtilityFactory { + + private final CuratorFramework root; + + // Facade bound to the provided path + private final CuratorFramework facade; + + public ZooKeeperUtilityFactory(Configuration configuration, String path) throws Exception { --- End diff -- Should perform precondition checks for arguments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97257878 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +import java.io.Serializable; +import java.util.concurrent.Executor; + +/** + * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency. The + * curator framework is cached in this instance and shared among all created ZooKeeper utility + * instances. This requires that the utility classes DO NOT close the provided curator framework. + * + * The curator framework is closed by calling the {@link #close(boolean)} method. + */ +public class ZooKeeperUtilityFactory { + + private final CuratorFramework root; + + // Facade bound to the provided path + private final CuratorFramework facade; + + public ZooKeeperUtilityFactory(Configuration configuration, String path) throws Exception { + root = ZooKeeperUtils.startCuratorFramework(configuration); + + root.newNamespaceAwareEnsurePath(path).ensure(root.getZookeeperClient()); + facade = root.usingNamespace(ZooKeeperUtils.generateZookeeperPath(root.getNamespace(), path)); + } + + /** +* Closes the ZooKeeperUtilityFactory. This entails closing the cached {@link CuratorFramework} +* instance. If cleanup is true, then the initial path and all its children are deleted. +* +* @param cleanup deletes the initial path and all of its children to clean up +* @throws Exception when deleting the znodes +*/ + public void close(boolean cleanup) throws Exception { + if (cleanup) { + facade.delete().deletingChildrenIfNeeded().forPath("/"); + } + + root.close(); + } + + /** +* Creates a {@link ZooKeeperStateHandleStore} instance with the provided arguments. +* +* @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles to +* @param stateStorageHelper storing the actual state data +* @param executor to run asynchronous callbacks of the state handle store +* @param Type of the state to be stored +* @return a ZooKeeperStateHandleStore instance +* @throws Exception if ZooKeeper could not create the provided state handle store path in +* ZooKeeper --- End diff -- nit: Does this line in the Javadoc need to be split into 2 lines? I don't think it's exceeding the max character limit, is it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97256203 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.util.Preconditions; + +/** + * Wrapper class for a {@link VersionedValue} so that we don't expose a curator dependency in our + * internal APIs. Such an exposure is problematic due to the relocation of curator. + */ +public class ZooKeeperVersionedValue { + + private final VersionedValue versionedValue; + + public ZooKeeperVersionedValue(VersionedValue versionedValue) { + this.versionedValue = Preconditions.checkNotNull(versionedValue); + } + + VersionedValue getVersionedValue() { --- End diff -- nitpick: I would place the public methods before any other package-private / private access methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97255788 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -376,15 +376,19 @@ public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter( } } - private static String generateZookeeperPath(String root, String namespace) { + public static String generateZookeeperPath(String root, String namespace) { if (!namespace.startsWith("/")) { - namespace = "/" + namespace; + namespace = '/' + namespace; } if (namespace.endsWith("/")) { namespace = namespace.substring(0, namespace.length() - 1); } + if (root.endsWith("/")) { --- End diff -- Nice catch .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3157: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFac...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3157#discussion_r97255571 --- Diff: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java --- @@ -73,13 +74,16 @@ /** * General tests for the Mesos resource manager component. */ -public class MesosFlinkResourceManagerTest { +public class MesosFlinkResourceManagerTest extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class); private static ActorSystem system; - private static Configuration config = new Configuration() {{ + private static Configuration config = new Configuration() { + private static final long serialVersionUID = -952579203067648838L; + + { --- End diff -- The indentation in this static block seems to be disordered (the following 2 `setInteger`s should be indented with one more tab). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5495) ZooKeeperMesosWorkerStore cannot be instantiated
[ https://issues.apache.org/jira/browse/FLINK-5495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833895#comment-15833895 ] ASF GitHub Bot commented on FLINK-5495: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3155 +1 to merge, LGTM. > ZooKeeperMesosWorkerStore cannot be instantiated > > > Key: FLINK-5495 > URL: https://issues.apache.org/jira/browse/FLINK-5495 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > > The {{ZooKeeperMesosWorkerStore}} cannot be instantiated because it > dynamically instantiates a {{ZooKeeperStateHandleStore}} without providing an > {{Executor}} to the constructor. This effectively breaks Mesos HA mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3155: [FLINK-5495] [mesos] Provide executor to ZooKeeperMesosWo...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3155 +1 to merge, LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3186: [FLINK-5582] [streaming] Add a general distributiv...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97246135 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- TableAPI UDAGG will be eventually translated to this windowStream API. The accumulate and retract will be handled in this add function. I think it is OK if we "view retractions as adding negative values". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833830#comment-15833830 ] ASF GitHub Bot commented on FLINK-5582: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97246135 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- TableAPI UDAGG will be eventually translated to this windowStream API. The accumulate and retract will be handled in this add function. I think it is OK if we "view retractions as adding negative values". > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to
[jira] [Assigned] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-5546: Assignee: shijinkui > When multiple users run test, /tmp/cacheFile conflicts > -- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Test >Affects Versions: 1.2.0, 1.3.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5546: - Assignee: (was: shijinkui) > When multiple users run test, /tmp/cacheFile conflicts > -- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Test >Affects Versions: 1.2.0, 1.3.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-5546: Assignee: shijinkui > When multiple users run test, /tmp/cacheFile conflicts > -- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Test >Affects Versions: 1.2.0, 1.3.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5582. - Resolution: Implemented Implemented in 09380e49256bff924734b9a932808e0f4daa7e5c > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunctionextends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5582. --- > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunctionextends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5590) Create a proper internal state hierarchy
[ https://issues.apache.org/jira/browse/FLINK-5590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5590. --- > Create a proper internal state hierarchy > > > Key: FLINK-5590 > URL: https://issues.apache.org/jira/browse/FLINK-5590 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Currently, the state interfaces (like {{ListState}}, {{ValueState}}, > {{ReducingState}}) are very sparse and contain only methods exposed to the > users. That makes sense to keep the public stable API minimal > At the same time, the runtime needs more methods for its internal interaction > with state, such as: > - setting namespaces > - accessing raw values > - merging namespaces > These are currently realized by re-creating or re-obtaining the state objects > from the KeyedStateBackend. That method causes quite an overhead for each > access to the state > The KeyedStateBackend tries to do some tricks to reduce that overhead, but > does it only partially and induces other overhead in the course. > The root cause of all these issues is a problem in the design: There is no > proper "internal state abstraction" in a similar way as there is an external > state abstraction (the public state API). > We should add a similar hierarchy of states for the internal methods. It > would look like in the example below: > {code} > * State > * | > * +---InternalKvState > * | | > * MergingState | > * | | > * +-InternalMergingState > * | | > * ++--+ | > * | | | > * ReducingStateListState+-+-+ > * | || | > * +---+ +--- -InternalListState > * || > * +-InternalReducingState > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5590) Create a proper internal state hierarchy
[ https://issues.apache.org/jira/browse/FLINK-5590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5590. - Resolution: Fixed Fixed in 3b97128f05bacfb80afe4a2a49741c31ff306cd2 > Create a proper internal state hierarchy > > > Key: FLINK-5590 > URL: https://issues.apache.org/jira/browse/FLINK-5590 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Currently, the state interfaces (like {{ListState}}, {{ValueState}}, > {{ReducingState}}) are very sparse and contain only methods exposed to the > users. That makes sense to keep the public stable API minimal > At the same time, the runtime needs more methods for its internal interaction > with state, such as: > - setting namespaces > - accessing raw values > - merging namespaces > These are currently realized by re-creating or re-obtaining the state objects > from the KeyedStateBackend. That method causes quite an overhead for each > access to the state > The KeyedStateBackend tries to do some tricks to reduce that overhead, but > does it only partially and induces other overhead in the course. > The root cause of all these issues is a problem in the design: There is no > proper "internal state abstraction" in a similar way as there is an external > state abstraction (the public state API). > We should add a similar hierarchy of states for the internal methods. It > would look like in the example below: > {code} > * State > * | > * +---InternalKvState > * | | > * MergingState | > * | | > * +-InternalMergingState > * | | > * ++--+ | > * | | | > * ReducingStateListState+-+-+ > * | || | > * +---+ +--- -InternalListState > * || > * +-InternalReducingState > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833726#comment-15833726 ] ASF GitHub Bot commented on FLINK-5582: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3186 > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunctionextends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3186: [FLINK-5582] [streaming] Add a general distributiv...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3186 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833701#comment-15833701 ] ASF GitHub Bot commented on FLINK-5582: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 Since this requires constantly extensive merge conflict resolving with the master, I want to merge this soon. @shaoxuan-wang has tested it life and the CI pass as well... > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunctionextends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3186: [FLINK-5582] [streaming] Add a general distributive aggre...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 Since this requires constantly extensive merge conflict resolving with the master, I want to merge this soon. @shaoxuan-wang has tested it life and the CI pass as well... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5608) Cancel button not always visible
[ https://issues.apache.org/jira/browse/FLINK-5608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833696#comment-15833696 ] ASF GitHub Bot commented on FLINK-5608: --- GitHub user rehevkor5 opened a pull request: https://github.com/apache/flink/pull/3189 [FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows - Most importantly, the Cancel button has been changed to float right, and will only wrap downward if pushed out by the job name - Also combined the job name and job id into a single horizontal element, reducing the overall horizontal space taken by the main navbar components in the job view, making the main navbar components less likely to wrap downward and be overlapped by the secondary navbar. - Compiled code has been rebuilt Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/rehevkor5/flink ui_job_cancel_button_layout_issue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3189 commit 33e3a14ebd3d614a6e5d4eae90f5985416e0b995 Author: Shannon CareyDate: 2017-01-22T20:52:43Z [FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows - Most importantly, the Cancel button has been changed to float right, and will only wrap downward if pushed out by the job name - Also combined the job name and job id into a single horizontal element, reducing the overall horizontal space taken by the main navbar components in the job view, making the main navbar components less likely to wrap downward and be overlapped by the secondary navbar. - Compiled code has been rebuilt > Cancel button not always visible > > > Key: FLINK-5608 > URL: https://issues.apache.org/jira/browse/FLINK-5608 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.1.4 >Reporter: Shannon Carey >Assignee: Shannon Carey >Priority: Minor > > When the window is not wide enough, or when the job name is too long, the > "Cancel" button in the Job view of the web UI is not visible because it is > the first element that gets wrapped down and gets covered by the secondary > navbar (the tabs). This causes us to often need to resize the browser wider > than our monitor in order to use the cancel button. > In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the > content may wrap, especially if the content's horizontal width if not known & > fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any > unexpected change in height will result in overlap with the rest of the > normal-flow content in the page. The Bootstrap docs explain this in their > "Overflowing content" callout. > I am submitting a PR which does not attempt to resolve all issues with the > fixed navbar approach, but attempts to improve the situation by using less > horizontal space and by altering the layout approach of the Cancel button. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3189: [FLINK-5608] [webfrontend] Cancel button stays vis...
GitHub user rehevkor5 opened a pull request: https://github.com/apache/flink/pull/3189 [FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows - Most importantly, the Cancel button has been changed to float right, and will only wrap downward if pushed out by the job name - Also combined the job name and job id into a single horizontal element, reducing the overall horizontal space taken by the main navbar components in the job view, making the main navbar components less likely to wrap downward and be overlapped by the secondary navbar. - Compiled code has been rebuilt Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/rehevkor5/flink ui_job_cancel_button_layout_issue Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3189 commit 33e3a14ebd3d614a6e5d4eae90f5985416e0b995 Author: Shannon CareyDate: 2017-01-22T20:52:43Z [FLINK-5608] [webfrontend] Cancel button stays visible in narrow windows - Most importantly, the Cancel button has been changed to float right, and will only wrap downward if pushed out by the job name - Also combined the job name and job id into a single horizontal element, reducing the overall horizontal space taken by the main navbar components in the job view, making the main navbar components less likely to wrap downward and be overlapped by the secondary navbar. - Compiled code has been rebuilt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5608) Cancel button not always visible
Shannon Carey created FLINK-5608: Summary: Cancel button not always visible Key: FLINK-5608 URL: https://issues.apache.org/jira/browse/FLINK-5608 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.1.4 Reporter: Shannon Carey Assignee: Shannon Carey Priority: Minor When the window is not wide enough, or when the job name is too long, the "Cancel" button in the Job view of the web UI is not visible because it is the first element that gets wrapped down and gets covered by the secondary navbar (the tabs). This causes us to often need to resize the browser wider than our monitor in order to use the cancel button. In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the content may wrap, especially if the content's horizontal width if not known & fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any unexpected change in height will result in overlap with the rest of the normal-flow content in the page. The Bootstrap docs explain this in their "Overflowing content" callout. I am submitting a PR which does not attempt to resolve all issues with the fixed navbar approach, but attempts to improve the situation by using less horizontal space and by altering the layout approach of the Cancel button. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833675#comment-15833675 ] ASF GitHub Bot commented on FLINK-5582: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 @shaoxuan-wang I cannot reproduce the compile error you posted. The latest commit also gets a green light from Travis CI: https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397 > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunctionextends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3186: [FLINK-5582] [streaming] Add a general distributive aggre...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 @shaoxuan-wang I cannot reproduce the compile error you posted. The latest commit also gets a green light from Travis CI: https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-5454) Add Documentation about how to tune Checkpointing for large state
[ https://issues.apache.org/jira/browse/FLINK-5454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5454. - Resolution: Fixed Fixed in - 1.2.0 via 099cdd01013c6959d3785088204601b926c04193 - 1.3.0 via 392b2e9a0595a947a24e1504cbfd5f9cabc1c86a > Add Documentation about how to tune Checkpointing for large state > - > > Key: FLINK-5454 > URL: https://issues.apache.org/jira/browse/FLINK-5454 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5454) Add Documentation about how to tune Checkpointing for large state
[ https://issues.apache.org/jira/browse/FLINK-5454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5454. --- > Add Documentation about how to tune Checkpointing for large state > - > > Key: FLINK-5454 > URL: https://issues.apache.org/jira/browse/FLINK-5454 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833605#comment-15833605 ] ASF GitHub Bot commented on FLINK-5582: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227451 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- My first feeling is to keep the name `add()` because it fits better together with the term `Accumulator`. One can view retractions as adding negative values. What do you think about that? > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and
[GitHub] flink pull request #3186: [FLINK-5582] [streaming] Add a general distributiv...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227451 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- My first feeling is to keep the name `add()` because it fits better together with the term `Accumulator`. One can view retractions as adding negative values. What do you think about that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5562) Driver fixes
[ https://issues.apache.org/jira/browse/FLINK-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833603#comment-15833603 ] ASF GitHub Bot commented on FLINK-5562: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3188 Looks good. I would like to merge this tomorrow together with #3187 (which is time critical for the release). > Driver fixes > > > Key: FLINK-5562 > URL: https://issues.apache.org/jira/browse/FLINK-5562 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Improve parametrization and output formatting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5562) Driver fixes
[ https://issues.apache.org/jira/browse/FLINK-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833601#comment-15833601 ] ASF GitHub Bot commented on FLINK-5562: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3187 Changes look good to me. As per Robert's email on the mailing list, he is shooting for a new release candidate very soon, so we would need to merge this before tomorrow, if possible. Will merge this tomorrow morning unless someone raises an objection till then... > Driver fixes > > > Key: FLINK-5562 > URL: https://issues.apache.org/jira/browse/FLINK-5562 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Improve parametrization and output formatting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue
[ https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5376: -- Description: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. was: The following are two examples where ordered stream element queue is mentioned: {code} LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + {code} I guess OrderedStreamElementQueue was coded first. > Misleading log statements in UnorderedStreamElementQueue > > > Key: FLINK-5376 > URL: https://issues.apache.org/jira/browse/FLINK-5376 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > The following are two examples where ordered stream element queue is > mentioned: > {code} > LOG.debug("Put element into ordered stream element queue. New filling > degree " + > "({}/{}).", numberEntries, capacity); > return true; > } else { > LOG.debug("Failed to put element into ordered stream element queue > because it " + > {code} > I guess OrderedStreamElementQueue was coded first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3188: [FLINK-5562] [gelly] Driver fixes
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3188 Looks good. I would like to merge this tomorrow together with #3187 (which is time critical for the release). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3187: [FLINK-5562] [gelly] Driver fixes for release-1.2
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3187 Changes look good to me. As per Robert's email on the mailing list, he is shooting for a new release candidate very soon, so we would need to merge this before tomorrow, if possible. Will merge this tomorrow morning unless someone raises an objection till then... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
[ https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4848: -- Description: {code} String keystoreFilePath = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE, null); ... try { keyStoreFile = new FileInputStream(new File(keystoreFilePath)); {code} If keystoreFilePath is null, the File ctor would throw NPE. was: {code} String keystoreFilePath = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE, null); ... try { keyStoreFile = new FileInputStream(new File(keystoreFilePath)); {code} If keystoreFilePath is null, the File ctor would throw NPE. > keystoreFilePath should be checked against null in > SSLUtils#createSSLServerContext > -- > > Key: FLINK-4848 > URL: https://issues.apache.org/jira/browse/FLINK-4848 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833598#comment-15833598 ] ASF GitHub Bot commented on FLINK-5582: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227303 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- Retractions are specific to the Table API. Do you expect this same interface to be used for user-defined aggregations there as well? > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > -
[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227303 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- Retractions are specific to the Table API. Do you expect this same interface to be used for user-defined aggregations there as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833589#comment-15833589 ] ASF GitHub Bot commented on FLINK-5582: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227249 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); + + OUT getResult(ACC accumulator); + + ACC merge(ACC a, ACC b); --- End diff -- This could be done, true. It would currently mean a slight overhead (for list creation), but that is probably okay. I would like to address that change in a separate pull request: We should probably adjust the merging state implementation as well, to exploit that new signature. > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but
[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227249 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); + + OUT getResult(ACC accumulator); + + ACC merge(ACC a, ACC b); --- End diff -- This could be done, true. It would currently mean a slight overhead (for list creation), but that is probably okay. I would like to address that change in a separate pull request: We should probably adjust the merging state implementation as well, to exploit that new signature. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833581#comment-15833581 ] ASF GitHub Bot commented on FLINK-5582: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 Update: The first version had an issue with binary compatibility in the Scala DataStream API: This Scala API accidentally exposed in Flink 1.0 a public method with an internal type as a parameter (an internal utility method). That should not have been the case, since method cannot be guaranteed when a parameter is a non-public type. Unfortunately, the automatic tooling for binary compatibility checks is not flexible enough to work around that: Exclusions for methods do not work if parameter types were altered. Due to that, the newer version rolls back the cleanup commit that renames the internal `AggregateFunction`. > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful aggregation function that supports: > - Different types to add, accumulate, and return > - The ability to merge partial aggregated by merging the accumulated type. > The proposed interface is below. This type of interface is found in many > APIs, like that of various databases, and also in Apache Beam: > - The accumulator is the state of the running aggregate > - Accumulators can be merged > - Values are added to the accumulator > - Getting the result from the accumulator perform an optional finalizing > operation > {code} > public interface AggregateFunctionextends Function { > ACC createAccumulator(); > void add(IN value, ACC accumulator); > OUT getResult(ACC accumulator); > ACC merge(ACC a, ACC b); > } > {code} > Example use: > {code} > public class AverageAccumulator { > long count; > long sum; > } > // implementation of a simple average > public class Average implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Integer value, AverageAccumulator acc) { > acc.sum += value; > acc.count++; > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > // implementation of a weighted average > // this reuses the same accumulator type as the aggregate function for > 'average' > public class WeightedAverage implements AggregateFunction AverageAccumulator, Double> { > public AverageAccumulator createAccumulator() { > return new AverageAccumulator(); > } > public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator > b) { > a.count += b.count; > a.sum += b.sum; > return a; > } > public void add(Datum value, AverageAccumulator acc) { > acc.count += value.getWeight(); > acc.sum += value.getValue(); > } > public Double getResult(AverageAccumulator acc) { > return acc.sum / (double) acc.count; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3186: [FLINK-5582] [streaming] dd a general distributive aggreg...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 Update: The first version had an issue with binary compatibility in the Scala DataStream API: This Scala API accidentally exposed in Flink 1.0 a public method with an internal type as a parameter (an internal utility method). That should not have been the case, since method cannot be guaranteed when a parameter is a non-public type. Unfortunately, the automatic tooling for binary compatibility checks is not flexible enough to work around that: Exclusions for methods do not work if parameter types were altered. Due to that, the newer version rolls back the cleanup commit that renames the internal `AggregateFunction`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833577#comment-15833577 ] ASF GitHub Bot commented on FLINK-5582: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97226958 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); + + OUT getResult(ACC accumulator); + + ACC merge(ACC a, ACC b); --- End diff -- Do you think it is useful to extend merge function to accept a list of ACC: ACC merge(List a). There are cases where the group merging a list of instances is much more efficient than just merge only two instances. > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical
[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97226958 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); + + OUT getResult(ACC accumulator); + + ACC merge(ACC a, ACC b); --- End diff -- Do you think it is useful to extend merge function to accept a list of ACC: ACC merge(List a). There are cases where the group merging a list of instances is much more efficient than just merge only two instances. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5582) Add a general distributive aggregate function
[ https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833576#comment-15833576 ] ASF GitHub Bot commented on FLINK-5582: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97226914 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- As proposed in https://goo.gl/00ea5j, this function will not only handle the accumulate, but also handle the retract. Instead of "add", can you please consider to use "update". > Add a general distributive aggregate function > - > > Key: FLINK-5582 > URL: https://issues.apache.org/jira/browse/FLINK-5582 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The {{DataStream}} API currently has two aggregation functions that can be > used on windows and in state, both of which have limitations: > - {{ReduceFunction}} only supports one type as the type that is added and > aggregated/returned. > - {{FoldFunction}} Supports different types to add and return, but is not > distributive, i.e. it cannot be used for hierarchical aggregation, for > example to split the aggregation into to pre- and final-aggregation. > I suggest to add a generic and powerful
[GitHub] flink pull request #3186: [FLINK-5582] [streaming] dd a general distributive...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97226914 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * An example how to use this interface is below: + * + * {@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction{ + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * } + */ +public interface AggregateFunction extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); --- End diff -- As proposed in https://goo.gl/00ea5j, this function will not only handle the accumulate, but also handle the retract. Instead of "add", can you please consider to use "update". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5576) extend deserialization functions of KvStateRequestSerializer to detect unconsumed bytes
[ https://issues.apache.org/jira/browse/FLINK-5576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-5576. -- Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 Fixed in 7a0e3d6 (release-1.2), 21742b2 (master). > extend deserialization functions of KvStateRequestSerializer to detect > unconsumed bytes > --- > > Key: FLINK-5576 > URL: https://issues.apache.org/jira/browse/FLINK-5576 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > KvStateRequestSerializer#deserializeValue and > KvStateRequestSerializer#deserializeList both deserialize a given byte array. > This is used by clients and unit tests and it is fair to assume that these > byte arrays represent a complete value since we do not offer a method to > continue reading from the middle of the array anyway. Therefore, we can treat > unconsumed bytes as errors, e.g. from a wrong serializer being used, and > throw a IOException with an appropriate failure message. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5559) queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException without own failure message if deserialisation fails
[ https://issues.apache.org/jira/browse/FLINK-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-5559. -- Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 Fixed in ef13f48 fd63981 f592d4c (release-1.2), c1c6ef1 3fe2cf5 563c3a4 (master). > queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws > an IOException without own failure message if deserialisation fails > - > > Key: FLINK-5559 > URL: https://issues.apache.org/jira/browse/FLINK-5559 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException, > e.g. EOFException, if the deserialisation fails, e.g. there are not enough > available bytes. > In these cases, it should instead also throw an IllegalArgumentException with > a message containing "This indicates a mismatch in the key/namespace > serializers used by the KvState instance and this access." as the other error > cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5576) extend deserialization functions of KvStateRequestSerializer to detect unconsumed bytes
[ https://issues.apache.org/jira/browse/FLINK-5576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833539#comment-15833539 ] ASF GitHub Bot commented on FLINK-5576: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3174 > extend deserialization functions of KvStateRequestSerializer to detect > unconsumed bytes > --- > > Key: FLINK-5576 > URL: https://issues.apache.org/jira/browse/FLINK-5576 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > > KvStateRequestSerializer#deserializeValue and > KvStateRequestSerializer#deserializeList both deserialize a given byte array. > This is used by clients and unit tests and it is fair to assume that these > byte arrays represent a complete value since we do not offer a method to > continue reading from the middle of the array anyway. Therefore, we can treat > unconsumed bytes as errors, e.g. from a wrong serializer being used, and > throw a IOException with an appropriate failure message. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5528) tests: reduce the retry delay in QueryableStateITCase
[ https://issues.apache.org/jira/browse/FLINK-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833538#comment-15833538 ] ASF GitHub Bot commented on FLINK-5528: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3139 > tests: reduce the retry delay in QueryableStateITCase > - > > Key: FLINK-5528 > URL: https://issues.apache.org/jira/browse/FLINK-5528 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.2.0, 1.3.0 > > > The QueryableStateITCase uses a retry of 1 second, e.g. if a queried key does > not exist yet. This seems a bit too conservative as the job may not take that > long to deploy and especially since getKvStateWithRetries() recovers from > failures by retrying. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5559) queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException without own failure message if deserialisation fails
[ https://issues.apache.org/jira/browse/FLINK-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833540#comment-15833540 ] ASF GitHub Bot commented on FLINK-5559: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3172 > queryable state: KvStateRequestSerializer#deserializeKeyAndNamespace() throws > an IOException without own failure message if deserialisation fails > - > > Key: FLINK-5559 > URL: https://issues.apache.org/jira/browse/FLINK-5559 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > > KvStateRequestSerializer#deserializeKeyAndNamespace() throws an IOException, > e.g. EOFException, if the deserialisation fails, e.g. there are not enough > available bytes. > In these cases, it should instead also throw an IllegalArgumentException with > a message containing "This indicates a mismatch in the key/namespace > serializers used by the KvState instance and this access." as the other error > cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3172: [FLINK-5559] let KvStateRequestSerializer#deserial...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3172 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3174: [FLINK-5576] extend deserialization functions of K...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3174 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3139: [FLINK-5528][query][tests] reduce the retry delay ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3139 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5607) Move location lookup retry out of KvStateLocationLookupService
Ufuk Celebi created FLINK-5607: -- Summary: Move location lookup retry out of KvStateLocationLookupService Key: FLINK-5607 URL: https://issues.apache.org/jira/browse/FLINK-5607 Project: Flink Issue Type: Improvement Components: Queryable State Reporter: Ufuk Celebi Assignee: Ufuk Celebi If a state location lookup fails because of an {{UnknownJobManager}}, the lookup service will automagically retry this. I think it's better to move this out of the lookup service and the retry be handled out side by the caller. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5606) Remove magic number in key and namespace serialization
Ufuk Celebi created FLINK-5606: -- Summary: Remove magic number in key and namespace serialization Key: FLINK-5606 URL: https://issues.apache.org/jira/browse/FLINK-5606 Project: Flink Issue Type: Improvement Components: Queryable State Reporter: Ufuk Celebi Assignee: Ufuk Celebi Priority: Minor The serialized key and namespace for state queries contains a magic number between the key and namespace: {{key|42|namespace}}. This was for historical reasons in order to skip deserialization of the key and namespace for our old {{RocksDBStateBackend}} which used the same format. This has now been superseded by the keygroup aware state backends and there is no point in doing this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5605) Make KvStateRequestSerializer package private
[ https://issues.apache.org/jira/browse/FLINK-5605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi reassigned FLINK-5605: -- Assignee: Ufuk Celebi > Make KvStateRequestSerializer package private > -- > > Key: FLINK-5605 > URL: https://issues.apache.org/jira/browse/FLINK-5605 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi >Priority: Minor > > From early users I've seen that many people use the KvStateRequestSerializer > in their programs. This was actually meant as an internal package to be used > by the client and server for internal message serialization. > I vote to make this package private and create an explicit > {{QueryableStateClientUtil}} for user serialization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5605) Make KvStateRequestSerializer package private
Ufuk Celebi created FLINK-5605: -- Summary: Make KvStateRequestSerializer package private Key: FLINK-5605 URL: https://issues.apache.org/jira/browse/FLINK-5605 Project: Flink Issue Type: Improvement Components: Queryable State Reporter: Ufuk Celebi Priority: Minor >From early users I've seen that many people use the KvStateRequestSerializer >in their programs. This was actually meant as an internal package to be used >by the client and server for internal message serialization. I vote to make this package private and create an explicit {{QueryableStateClientUtil}} for user serialization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5604) Replace QueryableStateClient constructor
Ufuk Celebi created FLINK-5604: -- Summary: Replace QueryableStateClient constructor Key: FLINK-5604 URL: https://issues.apache.org/jira/browse/FLINK-5604 Project: Flink Issue Type: Improvement Components: Queryable State Reporter: Ufuk Celebi Assignee: Ufuk Celebi The {{QueryableStateClient}} constructor expects a configuration object which makes it very hard for users to see what's expected to be there and what not. I propose to split this constructor up and add some static helper for the common cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5509) Replace QueryableStateClient keyHashCode argument
[ https://issues.apache.org/jira/browse/FLINK-5509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi reassigned FLINK-5509: -- Assignee: Ufuk Celebi > Replace QueryableStateClient keyHashCode argument > - > > Key: FLINK-5509 > URL: https://issues.apache.org/jira/browse/FLINK-5509 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi >Priority: Minor > > When going over the low level QueryableStateClient with [~NicoK] we noticed > that the key hashCode argument can be confusing to users: > {code} > FuturegetKvState( > JobID jobId, > String name, > int keyHashCode, > byte[] serializedKeyAndNamespace) > {code} > The {{keyHashCode}} argument is the result of calling {{hashCode()}} on the > key to look up. This is what is send to the JobManager in order to look up > the location of the key. While pretty straight forward, it is repetitive and > possibly confusing. > As an alternative we suggest to make the method generic and simply call > hashCode on the object ourselves. This way the user just provides the key > object. > Since there are some early users of the queryable state API already, we would > suggest to rename the method in order to provoke a compilation error after > upgrading to the actually released 1.2 version. > (This would also work without renaming since the hashCode of Integer (what > users currently provide) is the same number, but it would be confusing why it > acutally works.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5603) Use Flink's futures in QueryableStateClient
Ufuk Celebi created FLINK-5603: -- Summary: Use Flink's futures in QueryableStateClient Key: FLINK-5603 URL: https://issues.apache.org/jira/browse/FLINK-5603 Project: Flink Issue Type: Improvement Components: Queryable State Reporter: Ufuk Celebi Assignee: Ufuk Celebi Priority: Minor The current {{QueryableStateClient}} exposes Scala's Futures as the return type for queries. Since we are trying to get away from hard Scala dependencies in the current master, we should proactively replace this with Flink's Future interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4220) ClientTest fails after port conflict
[ https://issues.apache.org/jira/browse/FLINK-4220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-4220. -- Resolution: Cannot Reproduce > ClientTest fails after port conflict > > > Key: FLINK-4220 > URL: https://issues.apache.org/jira/browse/FLINK-4220 > Project: Flink > Issue Type: Bug > Components: Client >Reporter: Ufuk Celebi > Labels: test-stability > > {code} > Running org.apache.flink.client.program.ClientTest > org.jboss.netty.channel.ChannelException: Failed to bind to: /127.0.0.1:42087 > at > org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) > at > akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) > at > akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) > at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) > at scala.util.Try$.apply(Try.scala:161) > at scala.util.Success.map(Try.scala:206) > at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) > at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.net.BindException: Address already in use > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Net.java:444) > at sun.nio.ch.Net.bind(Net.java:436) > at > sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) > at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) > at > org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372) > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296) > at > org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > ... > Failed tests: > ClientTest.setUp:107 Setup of test actor system failed. > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/144842320/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5116) Missing RocksDB state backend factory
[ https://issues.apache.org/jira/browse/FLINK-5116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-5116. -- Resolution: Won't Fix Implemented in >= 1.2 > Missing RocksDB state backend factory > - > > Key: FLINK-5116 > URL: https://issues.apache.org/jira/browse/FLINK-5116 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.3 >Reporter: Ufuk Celebi >Priority: Critical > > The 1.1 series misses a factory for the rocks db backend. This means that > 1.1. users cannot configure rocks DB via the config file and are forced to > set it via the environment. > This has been recently added to 1.2-SNAPSHOT. It should be back ported to > 1.1.4 > (https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5436) UDF state without CheckpointedRestoring can result in restarting loop
[ https://issues.apache.org/jira/browse/FLINK-5436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833467#comment-15833467 ] Ufuk Celebi commented on FLINK-5436: [~stefanrichte...@gmail.com] and [~aljoscha] should we address this and FLINK-5437 for the upcoming release or not? > UDF state without CheckpointedRestoring can result in restarting loop > - > > Key: FLINK-5436 > URL: https://issues.apache.org/jira/browse/FLINK-5436 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Minor > > When restoring a job with Checkpointed state and not implementing the new > CheckpointedRestoring interface, the job will be restarted over and over > again (given the respective restarting strategy). > Since this is not recoverable, we should immediately fail the job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5530) race condition in AbstractRocksDBState#getSerializedValue
[ https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-5530. -- Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 Fixed in d8222c1 (release-1.2), d16552d (master). > race condition in AbstractRocksDBState#getSerializedValue > - > > Key: FLINK-5530 > URL: https://issues.apache.org/jira/browse/FLINK-5530 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.2.0, 1.3.0 > > > AbstractRocksDBState#getSerializedValue() uses the same key serialisation > stream as the ordinary state access methods but is called in parallel during > state queries thus violating the assumption of only one thread accessing it. > This may lead to either wrong results in queries or corrupt data while > queries are executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5530) race condition in AbstractRocksDBState#getSerializedValue
[ https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833448#comment-15833448 ] ASF GitHub Bot commented on FLINK-5530: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3143 > race condition in AbstractRocksDBState#getSerializedValue > - > > Key: FLINK-5530 > URL: https://issues.apache.org/jira/browse/FLINK-5530 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.2.0, 1.3.0 > > > AbstractRocksDBState#getSerializedValue() uses the same key serialisation > stream as the ordinary state access methods but is called in parallel during > state queries thus violating the assumption of only one thread accessing it. > This may lead to either wrong results in queries or corrupt data while > queries are executed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3143 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---