[jira] [Commented] (FLINK-2991) Extend Window Operators to Allow Efficient Fold Operation
[ https://issues.apache.org/jira/browse/FLINK-2991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140460#comment-15140460 ] ASF GitHub Bot commented on FLINK-2991: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1605#issuecomment-182237986 I'll add doc and also fix some other stuff in the doc that wasn't updated. > Extend Window Operators to Allow Efficient Fold Operation > - > > Key: FLINK-2991 > URL: https://issues.apache.org/jira/browse/FLINK-2991 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, a window fold is implemented as a WindowFunction that gets all the > elements as input. No pre-aggregation is performed. The window operator > should be extended to also allow the fold to also be pre-aggregated. > This requires changing the signature of the {{WindowBuffer}} so that it can > emit a type other than the input type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2991] Add Folding State and use in Wind...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1605#issuecomment-182237986 I'll add doc and also fix some other stuff in the doc that wasn't updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3035) Redis as State Backend
[ https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140239#comment-15140239 ] Subhobrata Dey commented on FLINK-3035: --- Hello [~mjsax] Thanks for replying. I adopted & created a PR out of my implementation. Would love to know your views. > Redis as State Backend > -- > > Key: FLINK-3035 > URL: https://issues.apache.org/jira/browse/FLINK-3035 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Matthias J. Sax >Assignee: Subhobrata Dey >Priority: Minor > > Add Redis as a state backend for distributed snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3035] Redis as State Backend
GitHub user sbcd90 opened a pull request: https://github.com/apache/flink/pull/1617 [FLINK-3035] Redis as State Backend @mjsax please review. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sbcd90/flink FLINK-3035 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1617 commit 5a4b2f09e6990185ca6cdf3d91a4561dcb23098b Author: Subhobrata Dey Date: 2016-02-10T01:33:13Z [FLINK-3035] Redis as State Backend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3035) Redis as State Backend
[ https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140206#comment-15140206 ] ASF GitHub Bot commented on FLINK-3035: --- GitHub user sbcd90 opened a pull request: https://github.com/apache/flink/pull/1617 [FLINK-3035] Redis as State Backend @mjsax please review. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sbcd90/flink FLINK-3035 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1617 commit 5a4b2f09e6990185ca6cdf3d91a4561dcb23098b Author: Subhobrata Dey Date: 2016-02-10T01:33:13Z [FLINK-3035] Redis as State Backend > Redis as State Backend > -- > > Key: FLINK-3035 > URL: https://issues.apache.org/jira/browse/FLINK-3035 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Matthias J. Sax >Assignee: Subhobrata Dey >Priority: Minor > > Add Redis as a state backend for distributed snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2991) Extend Window Operators to Allow Efficient Fold Operation
[ https://issues.apache.org/jira/browse/FLINK-2991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139965#comment-15139965 ] ASF GitHub Bot commented on FLINK-2991: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1605#issuecomment-182125713 You are adding new `.apply()` variants to the API which are not documented with the PR. > Extend Window Operators to Allow Efficient Fold Operation > - > > Key: FLINK-2991 > URL: https://issues.apache.org/jira/browse/FLINK-2991 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Right now, a window fold is implemented as a WindowFunction that gets all the > elements as input. No pre-aggregation is performed. The window operator > should be extended to also allow the fold to also be pre-aggregated. > This requires changing the signature of the {{WindowBuffer}} so that it can > emit a type other than the input type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2991] Add Folding State and use in Wind...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1605#issuecomment-182125713 You are adding new `.apply()` variants to the API which are not documented with the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139953#comment-15139953 ] ASF GitHub Bot commented on FLINK-2523: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388927 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); } /** -* Constructs a new job graph with the given name and a random job ID if null supplied as an id. +* Constructs a new job graph with the given name, the given {@link ExecutionConfig}, +* and a random job ID. +* +* @param jobName The name of the job. +* @param config The execution configuration of the job. +*/ + public JobGraph(String jobName, ExecutionConfig config) { + this(null, jobName, config); + } + + /** +* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), +* the given name and the given execution configuration (see {@link ExecutionConfig}). * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. +* @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName) { + public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.executionConfig = config; --- End diff -- It cannot, I just added a check. > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388927 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); } /** -* Constructs a new job graph with the given name and a random job ID if null supplied as an id. +* Constructs a new job graph with the given name, the given {@link ExecutionConfig}, +* and a random job ID. +* +* @param jobName The name of the job. +* @param config The execution configuration of the job. +*/ + public JobGraph(String jobName, ExecutionConfig config) { + this(null, jobName, config); + } + + /** +* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), +* the given name and the given execution configuration (see {@link ExecutionConfig}). * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. +* @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName) { + public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.executionConfig = config; --- End diff -- It cannot, I just added a check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388853 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); --- End diff -- Same as before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139951#comment-15139951 ] ASF GitHub Bot commented on FLINK-2523: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388853 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); --- End diff -- Same as before. > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139949#comment-15139949 ] ASF GitHub Bot commented on FLINK-2523: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388830 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); --- End diff -- Again here the constructors that do not specify executionConfigs are only exists for tests. So here the empty ExecutionConfig is just to not break the already existing tests that were testing other parts of the functionality. > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388830 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); --- End diff -- Again here the constructors that do not specify executionConfigs are only exists for tests. So here the empty ExecutionConfig is just to not break the already existing tests that were testing other parts of the functionality. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139942#comment-15139942 ] ASF GitHub Bot commented on FLINK-2523: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388350 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java --- @@ -141,9 +146,16 @@ public TaskDeploymentDescriptor( List requiredJarFiles, List requiredClasspaths, int targetSlotNumber) { - this(appId, jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber, - jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, - inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1); + this(appId, jobID, vertexID, executionId, new ExecutionConfig(), taskName, indexInSubtaskGroup, --- End diff -- This constructor only exists for tests. The executionConfig in the TDD is the change of this pull request, so here the new ExecutionConfig is just to not break the already existing tests that were testing other parts of the functionality of the TDD. New tests are added to test the new functionality. > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52388350 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java --- @@ -141,9 +146,16 @@ public TaskDeploymentDescriptor( List requiredJarFiles, List requiredClasspaths, int targetSlotNumber) { - this(appId, jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber, - jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, - inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1); + this(appId, jobID, vertexID, executionId, new ExecutionConfig(), taskName, indexInSubtaskGroup, --- End diff -- This constructor only exists for tests. The executionConfig in the TDD is the change of this pull request, so here the new ExecutionConfig is just to not break the already existing tests that were testing other parts of the functionality of the TDD. New tests are added to test the new functionality. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2991] Add Folding State and use in Wind...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1605#discussion_r52388115 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.rocksdb.RocksDBException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ReducingState} implementation that stores state in RocksDB. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the values that can be folded into the state. + * @param The type of the value in the folding state. + * @param The type of the backend that snapshots this key/value state. + */ +public class RocksDBFoldingState + extends AbstractRocksDBState, FoldingStateDescriptor, Backend> + implements FoldingState { + + /** Serializer for the values */ + private final TypeSerializer valueSerializer; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final FoldingStateDescriptor stateDesc; + + /** User-specified fold function */ + private final FoldFunction foldFunction; + + /** +* Creates a new {@code RocksDBFoldingState}. +* +* @param keySerializer The serializer for the keys. +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. This contains name +* and can create a default state value. +* @param dbPath The path on the local system where RocksDB data should be stored. +*/ + protected RocksDBFoldingState(TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + FoldingStateDescriptor stateDesc, + File dbPath, + String backupPath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath); + this.stateDesc = requireNonNull(stateDesc); + this.valueSerializer = stateDesc.getSerializer(); + this.foldFunction = stateDesc.getFoldFunction(); + } + + protected RocksDBFoldingState(TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + FoldingStateDescriptor stateDesc, + File dbPath, + String backupPath, + String restorePath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + this.stateDesc = stateDesc; + this.valueSerializer = stateDesc.getSerializer(); + this.foldFunction = stateDesc.getFoldFunction(); + } + + @Override + public ACC get() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); +
[jira] [Commented] (FLINK-2991) Extend Window Operators to Allow Efficient Fold Operation
[ https://issues.apache.org/jira/browse/FLINK-2991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139939#comment-15139939 ] ASF GitHub Bot commented on FLINK-2991: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1605#discussion_r52388115 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.rocksdb.RocksDBException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ReducingState} implementation that stores state in RocksDB. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the values that can be folded into the state. + * @param The type of the value in the folding state. + * @param The type of the backend that snapshots this key/value state. + */ +public class RocksDBFoldingState + extends AbstractRocksDBState, FoldingStateDescriptor, Backend> + implements FoldingState { + + /** Serializer for the values */ + private final TypeSerializer valueSerializer; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final FoldingStateDescriptor stateDesc; + + /** User-specified fold function */ + private final FoldFunction foldFunction; + + /** +* Creates a new {@code RocksDBFoldingState}. +* +* @param keySerializer The serializer for the keys. +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. This contains name +* and can create a default state value. +* @param dbPath The path on the local system where RocksDB data should be stored. +*/ + protected RocksDBFoldingState(TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + FoldingStateDescriptor stateDesc, + File dbPath, + String backupPath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath); + this.stateDesc = requireNonNull(stateDesc); + this.valueSerializer = stateDesc.getSerializer(); + this.foldFunction = stateDesc.getFoldFunction(); + } + + protected RocksDBFoldingState(TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + FoldingStateDescriptor stateDesc, + File dbPath, + String backupPath, + String restorePath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + this.stateDesc = stateDesc; + this.valueSerializer = stateDesc.getSerializer(); + this.foldFunction = stateDesc.getFoldFunction(); + } + + @Override + public ACC get() { + ByteArrayOutputStream baos = new ByteArrayOutp
[jira] [Commented] (FLINK-3366) Rename @Experimental annotation to @PublicEvolving
[ https://issues.apache.org/jira/browse/FLINK-3366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139929#comment-15139929 ] ASF GitHub Bot commented on FLINK-3366: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1599#issuecomment-182115171 +1 to merge > Rename @Experimental annotation to @PublicEvolving > -- > > Key: FLINK-3366 > URL: https://issues.apache.org/jira/browse/FLINK-3366 > Project: Flink > Issue Type: Task >Affects Versions: 1.0.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.0.0 > > > As per discussion on the dev ML, rename the @Experimental annotation to > @PublicEvolving. > Experimental might suggest instable / unreliable functionality which is not > the intended meaning of this annotation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3366] Rename @Experimental annotation t...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1599#issuecomment-182115171 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3372) Setting custom YARN application name is ignored
[ https://issues.apache.org/jira/browse/FLINK-3372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139922#comment-15139922 ] ASF GitHub Bot commented on FLINK-3372: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1607#issuecomment-182114169 +1 to merge > Setting custom YARN application name is ignored > --- > > Key: FLINK-3372 > URL: https://issues.apache.org/jira/browse/FLINK-3372 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 0.10.1 >Reporter: Nick Dimiduk > > The {{-ynm}} optional argument is ignored. From my debugging > FlinkYarnClientBase does the right thing to parse and set the value. > CliFrontend ignores this parsed value, always calling > {noformat} > flinkYarnClient.setName("Flink Application: " + programName); > {noformat} > down in {{getClient(CommandLineOptions, String, int)}}. Thus every job > submission to YARN is identifiable only by its classname. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3372] Setting custom YARN application n...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1607#issuecomment-182114169 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java --- @@ -141,9 +146,16 @@ public TaskDeploymentDescriptor( List requiredJarFiles, List requiredClasspaths, int targetSlotNumber) { - this(appId, jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber, - jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, - inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1); + this(appId, jobID, vertexID, executionId, new ExecutionConfig(), taskName, indexInSubtaskGroup, --- End diff -- why is there a constructor variant of the TDD that creates an empty execution config? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385188 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java --- @@ -75,16 +76,22 @@ public void testExecutionConfigSerialization() throws IOException, ClassNotFound config.disableSysoutLogging(); } config.setParallelism(dop); - + JobGraph jobGraph = compiler.createJobGraph("test"); - + + final String exec_config_key = "runtime.execconfig"; --- End diff -- variable name is not java style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139913#comment-15139913 ] ASF GitHub Bot commented on FLINK-2523: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52386582 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); --- End diff -- same Q here > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52386295 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); --- End diff -- Why are you creating an empty execution config here? Is this the execution config which will be passed to the user in the end? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385934 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); } /** -* Constructs a new job graph with the given name and a random job ID if null supplied as an id. +* Constructs a new job graph with the given name, the given {@link ExecutionConfig}, +* and a random job ID. +* +* @param jobName The name of the job. +* @param config The execution configuration of the job. +*/ + public JobGraph(String jobName, ExecutionConfig config) { + this(null, jobName, config); + } + + /** +* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), +* the given name and the given execution configuration (see {@link ExecutionConfig}). * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. +* @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName) { + public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.executionConfig = config; + } + + /** +* Constructs a new job graph containing the provided single {@code JobVertex} (see {@link JobVertex}), +* with no name, a random ID, and the default execution configuration (see {@link ExecutionConfig}). +* +* @param vertex The single vertex of the graph. +* */ + public JobGraph(JobVertex vertex) { + this(null, Collections.singletonList(vertex)); + } + + /** +* Constructs a new job graph containing the provided single {@code JobVertex} (see {@link JobVertex}), +* with the given name, a random ID, and the default execution configuration (see {@link ExecutionConfig}). +* +* @param jobName The name of the job. +* @param vertex The single vertex of the graph. +* */ + public JobGraph(String jobName, JobVertex vertex) { + this(jobName, Collections.singletonList(vertex)); + } + + /** +* Constructs a new job graph containing the provided two {@code JobVertices} (see {@link JobVertex}), +* with the given name, a random ID, and the default execution configuration (see {@link ExecutionConfig}). +* +* @param jobName The name of the job. +* @param vertex1 The first vertex of the graph. +* @param vertex2 The second vertex of the graph. +* */ + public JobGraph(String jobName, JobVertex vertex1, JobVertex vertex2) { + this(jobName, Arrays.asList(vertex1, vertex2)); } /** -* Constructs a new job graph with no name and a random job ID if null supplied as an id. +* Constructs a new job graph containing the provided {@code JobVertices} (see {@link JobVertex}), +* with no name, a random job ID, and the default execution configuration (see {@link ExecutionConfig}). * * @param vertices The vertices to add to the graph. */ - public JobGraph(JobVertex... vertices) { --- End diff -- Why did you remove the varargs constructor? --- If your project is set up for it, you can
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52386582 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); --- End diff -- same Q here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385415 --- Diff: docs/apis/batch/index.md --- @@ -2123,7 +2123,7 @@ Note that types registered with `registerKryoType()` are not available to Flink' - `disableAutoTypeRegistration()` Automatic type registration is enabled by default. The automatic type registration is registering all types (including sub-types) used by usercode with Kryo and the POJO serializer. - +- `setTaskCancellationInterval(long interval)` Sets the the interval (in milliseconds) to wait between consecutive attempts to cancel a running task. By default this is set to **3** milliseconds, or **30 seconds**. --- End diff -- I would explain a bit more detailed what's going on. The `cancel()` method is called only once. The interval determines the interrupt frequency! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385223 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java --- @@ -27,13 +24,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.InstantiationUtil; import org.junit.Test; +import java.io.IOException; +import java.util.Random; + import static org.junit.Assert.*; -public class StreamingJobGraphGeneratorTest { +public class +StreamingJobGraphGeneratorTest { --- End diff -- why is there a line break after the `class`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139908#comment-15139908 ] ASF GitHub Bot commented on FLINK-2523: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52386295 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); --- End diff -- Why are you creating an empty execution config here? Is this the execution config which will be passed to the user in the end? > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139897#comment-15139897 ] ASF GitHub Bot commented on FLINK-2523: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385223 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java --- @@ -27,13 +24,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.InstantiationUtil; import org.junit.Test; +import java.io.IOException; +import java.util.Random; + import static org.junit.Assert.*; -public class StreamingJobGraphGeneratorTest { +public class +StreamingJobGraphGeneratorTest { --- End diff -- why is there a line break after the `class`? > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139903#comment-15139903 ] ASF GitHub Bot commented on FLINK-2523: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java --- @@ -141,9 +146,16 @@ public TaskDeploymentDescriptor( List requiredJarFiles, List requiredClasspaths, int targetSlotNumber) { - this(appId, jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber, - jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, - inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1); + this(appId, jobID, vertexID, executionId, new ExecutionConfig(), taskName, indexInSubtaskGroup, --- End diff -- why is there a constructor variant of the TDD that creates an empty execution config? > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139899#comment-15139899 ] ASF GitHub Bot commented on FLINK-2523: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385415 --- Diff: docs/apis/batch/index.md --- @@ -2123,7 +2123,7 @@ Note that types registered with `registerKryoType()` are not available to Flink' - `disableAutoTypeRegistration()` Automatic type registration is enabled by default. The automatic type registration is registering all types (including sub-types) used by usercode with Kryo and the POJO serializer. - +- `setTaskCancellationInterval(long interval)` Sets the the interval (in milliseconds) to wait between consecutive attempts to cancel a running task. By default this is set to **3** milliseconds, or **30 seconds**. --- End diff -- I would explain a bit more detailed what's going on. The `cancel()` method is called only once. The interval determines the interrupt frequency! > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139904#comment-15139904 ] ASF GitHub Bot commented on FLINK-2523: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385934 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); } /** -* Constructs a new job graph with the given name and a random job ID if null supplied as an id. +* Constructs a new job graph with the given name, the given {@link ExecutionConfig}, +* and a random job ID. +* +* @param jobName The name of the job. +* @param config The execution configuration of the job. +*/ + public JobGraph(String jobName, ExecutionConfig config) { + this(null, jobName, config); + } + + /** +* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), +* the given name and the given execution configuration (see {@link ExecutionConfig}). * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. +* @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName) { + public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.executionConfig = config; + } + + /** +* Constructs a new job graph containing the provided single {@code JobVertex} (see {@link JobVertex}), +* with no name, a random ID, and the default execution configuration (see {@link ExecutionConfig}). +* +* @param vertex The single vertex of the graph. +* */ + public JobGraph(JobVertex vertex) { + this(null, Collections.singletonList(vertex)); + } + + /** +* Constructs a new job graph containing the provided single {@code JobVertex} (see {@link JobVertex}), +* with the given name, a random ID, and the default execution configuration (see {@link ExecutionConfig}). +* +* @param jobName The name of the job. +* @param vertex The single vertex of the graph. +* */ + public JobGraph(String jobName, JobVertex vertex) { + this(jobName, Collections.singletonList(vertex)); + } + + /** +* Constructs a new job graph containing the provided two {@code JobVertices} (see {@link JobVertex}), +* with the given name, a random ID, and the default execution configuration (see {@link ExecutionConfig}). +* +* @param jobName The name of the job. +* @param vertex1 The first vertex of the graph. +* @param vertex2 The second vertex of the graph. +* */ + public JobGraph(String jobName, JobVertex vertex1, JobVertex vertex2) { + this(jobName, Arrays.asList(vertex1, vertex2)); } /** -* Constructs a new job graph with no name and a random job ID if null supplied as an id. +* Constructs a new job graph containing the provided {@code JobVertices} (see {@link JobVertex}), +* with no name, a random job ID, and the default execution configuration (see {@link ExecutionConfig})
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139895#comment-15139895 ] ASF GitHub Bot commented on FLINK-2523: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52385188 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java --- @@ -75,16 +76,22 @@ public void testExecutionConfigSerialization() throws IOException, ClassNotFound config.disableSysoutLogging(); } config.setParallelism(dop); - + JobGraph jobGraph = compiler.createJobGraph("test"); - + + final String exec_config_key = "runtime.execconfig"; --- End diff -- variable name is not java style > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2380: allow to specify the default files...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1524#issuecomment-182054657 I'm testing the change on a cluster (with YARN) to see if everything is working as expected. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs
[ https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139750#comment-15139750 ] ASF GitHub Bot commented on FLINK-2380: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1524#issuecomment-182066864 Setting the value to `fs.default-scheme: thisIsWrong:///` is good: ``` robert@cdh544-master:~/flink/build-target$ ./bin/flink run ./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv /user/robert/elasdoijwef The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175) Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailb
[GitHub] flink pull request: FLINK-2380: allow to specify the default files...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1524#issuecomment-182066864 Setting the value to `fs.default-scheme: thisIsWrong:///` is good: ``` robert@cdh544-master:~/flink/build-target$ ./bin/flink run ./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv /user/robert/elasdoijwef The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175) Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPoo
[jira] [Commented] (FLINK-3335) Fix DataSourceTask object reuse when disabled
[ https://issues.apache.org/jira/browse/FLINK-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139749#comment-15139749 ] ASF GitHub Bot commented on FLINK-3335: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1616 [FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled When object reuse is disabled, `DataSourceTask` now copies objects received from the `InputFormat` to prevent the collection of reused objects. An example where this is necessary is a `DataSet` created from a user implementation of `Iterator` which reuses a local object returned from `Iterator.next`. Also, when object reuse is enabled, the cycling among three objects has been removed. I had added this a few months ago when starting to resolve an issue with reduce drivers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3335_fix_datasourcetask_object_reuse_when_disabled Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1616 commit 2678b9315a28ce27d888c7be53e5cce13b1afb35 Author: Greg Hogan Date: 2016-02-09T13:18:28Z [FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled When object reuse is disabled, DataSourceTask now copies objects received from the InputFormat to prevent the collection of reused objects. > Fix DataSourceTask object reuse when disabled > - > > Key: FLINK-3335 > URL: https://issues.apache.org/jira/browse/FLINK-3335 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > From {{DataSourceTask.invoke()}}: > {code} > if ((returned = format.nextRecord(serializer.createInstance())) != null) { > output.collect(returned); > } > {code} > The returned value ({{returned}}) must be copied rather than creating and > passing in a new instance. The {{InputFormat}} interface only permits the > given object to be used and does not require a new object to be returned > otherwise. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3335] [runtime] Fix DataSourceTask obje...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1616 [FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled When object reuse is disabled, `DataSourceTask` now copies objects received from the `InputFormat` to prevent the collection of reused objects. An example where this is necessary is a `DataSet` created from a user implementation of `Iterator` which reuses a local object returned from `Iterator.next`. Also, when object reuse is enabled, the cycling among three objects has been removed. I had added this a few months ago when starting to resolve an issue with reduce drivers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3335_fix_datasourcetask_object_reuse_when_disabled Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1616 commit 2678b9315a28ce27d888c7be53e5cce13b1afb35 Author: Greg Hogan Date: 2016-02-09T13:18:28Z [FLINK-3335] [runtime] Fix DataSourceTask object reuse when disabled When object reuse is disabled, DataSourceTask now copies objects received from the InputFormat to prevent the collection of reused objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs
[ https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139744#comment-15139744 ] ASF GitHub Bot commented on FLINK-2380: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1524#issuecomment-182065768 I identified the following issues: - Setting the configuration using the yarn session "dynamic properties": `./bin/yarn-session.sh -n 2 -Dfs.default-scheme=hdfs:///` does not really work (the configuration parameter shows up in the web interface, but the job fails) - Setting a false schema leads to a null pointer exception on job submission. In the flink-conf.yaml, I have `fs.default-scheme: thisIsWrong`. Look at this: ``` org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': null at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175) Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': null at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
[GitHub] flink pull request: FLINK-2380: allow to specify the default files...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1524#issuecomment-182065768 I identified the following issues: - Setting the configuration using the yarn session "dynamic properties": `./bin/yarn-session.sh -n 2 -Dfs.default-scheme=hdfs:///` does not really work (the configuration parameter shows up in the web interface, but the job fails) - Setting a false schema leads to a null pointer exception on job submission. In the flink-conf.yaml, I have `fs.default-scheme: thisIsWrong`. Look at this: ``` org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': null at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175) Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter: ))': null at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[jira] [Commented] (FLINK-2380) Allow to configure default FS for file inputs
[ https://issues.apache.org/jira/browse/FLINK-2380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139717#comment-15139717 ] ASF GitHub Bot commented on FLINK-2380: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1524#issuecomment-182054657 I'm testing the change on a cluster (with YARN) to see if everything is working as expected. > Allow to configure default FS for file inputs > - > > Key: FLINK-2380 > URL: https://issues.apache.org/jira/browse/FLINK-2380 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 0.9, 0.10.0 >Reporter: Ufuk Celebi >Assignee: Klou >Priority: Minor > Labels: starter > Fix For: 1.0.0 > > > File inputs use "file://" as default prefix. A user asked to make this > configurable, e.g. "hdfs://" as default. > (I'm not sure whether this is already possible or not. I will check and > either close or implement this for the user.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3260) ExecutionGraph gets stuck in state FAILING
[ https://issues.apache.org/jira/browse/FLINK-3260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139701#comment-15139701 ] ASF GitHub Bot commented on FLINK-3260: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1613#issuecomment-182046481 Looks good, with one inline comment. Otherwise, +1 to merge > ExecutionGraph gets stuck in state FAILING > -- > > Key: FLINK-3260 > URL: https://issues.apache.org/jira/browse/FLINK-3260 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.0.0 > > > It is a bit of a rare case, but the following can currently happen: > 1. Jobs runs for a while, some tasks are already finished. > 2. Job fails, goes to state failing and restarting. Non-finished tasks fail > or are canceled. > 3. For the finished tasks, ask-futures from certain messages (for example > for releasing intermediate result partitions) can fail (timeout) and cause > the execution to go from FINISHED to FAILED > 4. This triggers the execution graph to go to FAILING without ever going > further into RESTARTING again > 5. The job is stuck > It initially looks like this is mainly an issue for batch jobs (jobs where > tasks do finish, rather than run infinitely). > The log that shows how this manifests: > {code} > > 17:19:19,782 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 17:19:19,844 INFO Remoting >- Starting remoting > 17:19:20,065 INFO Remoting >- Remoting started; listening on addresses > :[akka.tcp://flink@127.0.0.1:56722] > 17:19:20,090 INFO org.apache.flink.runtime.blob.BlobServer >- Created BLOB server storage directory > /tmp/blobStore-6766f51a-1c51-4a03-acfb-08c2c29c11f0 > 17:19:20,096 INFO org.apache.flink.runtime.blob.BlobServer >- Started BLOB server at 0.0.0.0:43327 - max concurrent requests: 50 - max > backlog: 1000 > 17:19:20,113 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist >- Started memory archivist akka://flink/user/archive > 17:19:20,115 INFO org.apache.flink.runtime.checkpoint.SavepointStoreFactory >- No savepoint state backend configured. Using job manager savepoint state > backend. > 17:19:20,118 INFO org.apache.flink.runtime.jobmanager.JobManager >- Starting JobManager at akka.tcp://flink@127.0.0.1:56722/user/jobmanager. > 17:19:20,123 INFO org.apache.flink.runtime.jobmanager.JobManager >- JobManager akka.tcp://flink@127.0.0.1:56722/user/jobmanager was granted > leadership with leader session ID None. > 17:19:25,605 INFO org.apache.flink.runtime.instance.InstanceManager >- Registered TaskManager at > testing-worker-linux-docker-e6d6931f-3200-linux-4 > (akka.tcp://flink@172.17.0.253:43702/user/taskmanager) as > f213232054587f296a12140d56f63ed1. Current number of registered hosts is 1. > Current number of alive task slots is 2. > 17:19:26,758 INFO org.apache.flink.runtime.instance.InstanceManager >- Registered TaskManager at > testing-worker-linux-docker-e6d6931f-3200-linux-4 > (akka.tcp://flink@172.17.0.253:43956/user/taskmanager) as > f9e78baa14fb38c69517fb1bcf4f419c. Current number of registered hosts is 2. > Current number of alive task slots is 4. > 17:19:27,064 INFO org.apache.flink.api.java.ExecutionEnvironment >- The job has 0 registered types and 0 default Kryo serializers > 17:19:27,071 INFO org.apache.flink.client.program.Client >- Starting client actor system > 17:19:27,072 INFO org.apache.flink.runtime.client.JobClient >- Starting JobClient actor system > 17:19:27,110 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 17:19:27,121 INFO Remoting >- Starting remoting > 17:19:27,143 INFO org.apache.flink.runtime.client.JobClient >- Started JobClient actor system at 127.0.0.1:51198 > 17:19:27,145 INFO Remoting >- Remoting started; listening on addresses > :[akka.tcp://flink@127.0.0.1:51198] > 17:19:27,325 INFO org.apache.flink.runtime.client.JobClientActor >- Disconnect from JobManager null. > 17:19:27,362 INFO org.apache.flink.runtime.client.JobClientActor >- Received job Flink Java
[jira] [Commented] (FLINK-3260) ExecutionGraph gets stuck in state FAILING
[ https://issues.apache.org/jira/browse/FLINK-3260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139699#comment-15139699 ] ASF GitHub Bot commented on FLINK-3260: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1613#discussion_r52368138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -107,7 +108,7 @@ private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); - private static final Logger LOG = ExecutionGraph.LOG; + private static final Logger LOG = LoggerFactory.getLogger(Execution.class); --- End diff -- Did this cause issues in this case? I originally set the logger to the ExecutionGraph logger to get all messages related to the execution and it changes in one log namespace. I always thought that makes searching the log easier. > ExecutionGraph gets stuck in state FAILING > -- > > Key: FLINK-3260 > URL: https://issues.apache.org/jira/browse/FLINK-3260 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.0.0 > > > It is a bit of a rare case, but the following can currently happen: > 1. Jobs runs for a while, some tasks are already finished. > 2. Job fails, goes to state failing and restarting. Non-finished tasks fail > or are canceled. > 3. For the finished tasks, ask-futures from certain messages (for example > for releasing intermediate result partitions) can fail (timeout) and cause > the execution to go from FINISHED to FAILED > 4. This triggers the execution graph to go to FAILING without ever going > further into RESTARTING again > 5. The job is stuck > It initially looks like this is mainly an issue for batch jobs (jobs where > tasks do finish, rather than run infinitely). > The log that shows how this manifests: > {code} > > 17:19:19,782 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 17:19:19,844 INFO Remoting >- Starting remoting > 17:19:20,065 INFO Remoting >- Remoting started; listening on addresses > :[akka.tcp://flink@127.0.0.1:56722] > 17:19:20,090 INFO org.apache.flink.runtime.blob.BlobServer >- Created BLOB server storage directory > /tmp/blobStore-6766f51a-1c51-4a03-acfb-08c2c29c11f0 > 17:19:20,096 INFO org.apache.flink.runtime.blob.BlobServer >- Started BLOB server at 0.0.0.0:43327 - max concurrent requests: 50 - max > backlog: 1000 > 17:19:20,113 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist >- Started memory archivist akka://flink/user/archive > 17:19:20,115 INFO org.apache.flink.runtime.checkpoint.SavepointStoreFactory >- No savepoint state backend configured. Using job manager savepoint state > backend. > 17:19:20,118 INFO org.apache.flink.runtime.jobmanager.JobManager >- Starting JobManager at akka.tcp://flink@127.0.0.1:56722/user/jobmanager. > 17:19:20,123 INFO org.apache.flink.runtime.jobmanager.JobManager >- JobManager akka.tcp://flink@127.0.0.1:56722/user/jobmanager was granted > leadership with leader session ID None. > 17:19:25,605 INFO org.apache.flink.runtime.instance.InstanceManager >- Registered TaskManager at > testing-worker-linux-docker-e6d6931f-3200-linux-4 > (akka.tcp://flink@172.17.0.253:43702/user/taskmanager) as > f213232054587f296a12140d56f63ed1. Current number of registered hosts is 1. > Current number of alive task slots is 2. > 17:19:26,758 INFO org.apache.flink.runtime.instance.InstanceManager >- Registered TaskManager at > testing-worker-linux-docker-e6d6931f-3200-linux-4 > (akka.tcp://flink@172.17.0.253:43956/user/taskmanager) as > f9e78baa14fb38c69517fb1bcf4f419c. Current number of registered hosts is 2. > Current number of alive task slots is 4. > 17:19:27,064 INFO org.apache.flink.api.java.ExecutionEnvironment >- The job has 0 registered types and 0 default Kryo serializers > 17:19:27,071 INFO org.apache.flink.client.program.Client >- Starting client actor system > 17:19:27,072 INFO org.apache.flink.runtime.client.JobClient >- Starting JobClient actor system > 17:19:27,110 INFO akka.event.slf4j.Slf4jLogger >-
[GitHub] flink pull request: [FLINK-3260] [runtime] Enforce terminal state ...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1613#issuecomment-182046481 Looks good, with one inline comment. Otherwise, +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3260] [runtime] Enforce terminal state ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1613#discussion_r52368138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -107,7 +108,7 @@ private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); - private static final Logger LOG = ExecutionGraph.LOG; + private static final Logger LOG = LoggerFactory.getLogger(Execution.class); --- End diff -- Did this cause issues in this case? I originally set the logger to the ExecutionGraph logger to get all messages related to the execution and it changes in one log namespace. I always thought that makes searching the log easier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3373) Using a newer library of Apache HttpClient than 4.2.6 will get class loading problems
[ https://issues.apache.org/jira/browse/FLINK-3373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139694#comment-15139694 ] ASF GitHub Bot commented on FLINK-3373: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1615 [FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency This makes the HTTP Components dependency disappear from the core classpath, allowing users to use their own version of the dependency. We need shading because we cannot simply bump the HTTP Components version to the newest version. The YARN test for Hadoop version >= 2.6.0 fail in that case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink http_shade Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1615.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1615 commit 1be39d12071c7251cd566e692c3a9c7b5440e46d Author: Stephan Ewen Date: 2016-02-09T20:18:43Z [FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency > Using a newer library of Apache HttpClient than 4.2.6 will get class loading > problems > - > > Key: FLINK-3373 > URL: https://issues.apache.org/jira/browse/FLINK-3373 > Project: Flink > Issue Type: Bug > Environment: Latest Flink snapshot 1.0 >Reporter: Jakob Sultan Ericsson > > When I trying to use Apache HTTP client 4.5.1 in my flink job it will crash > with NoClassDefFound. > This has to do that it load some classes from provided httpclient 4.2.5/6 in > core flink. > {noformat} > 17:05:56,193 INFO org.apache.flink.runtime.taskmanager.Task >- DuplicateFilter -> InstallKeyLookup (11/16) switched to FAILED with > exception. > java.lang.NoSuchFieldError: INSTANCE > at > org.apache.http.conn.ssl.SSLConnectionSocketFactory.(SSLConnectionSocketFactory.java:144) > at > org.apache.http.impl.conn.PoolingHttpClientConnectionManager.getDefaultRegistry(PoolingHttpClientConnectionManager.java:109) > at > org.apache.http.impl.conn.PoolingHttpClientConnectionManager.(PoolingHttpClientConnectionManager.java:116) > ... > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:305) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:227) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at java.lang.Thread.run(Thread.java:745) > {noformat} > SSLConnectionSocketFactory and finds an earlier version of the > AllowAllHostnameVerifier that does have the INSTANCE variable (instance > variable was probably added in 4.3). > {noformat} > jar tvf lib/flink-dist-1.0-SNAPSHOT.jar |grep AllowAllHostnameVerifier >791 Thu Dec 17 09:55:46 CET 2015 > org/apache/http/conn/ssl/AllowAllHostnameVerifier.class > {noformat} > Solutions would be: > - Fix the classloader so that my custom job does not conflict with internal > flink-core classes... pretty hard > - Remove the dependency somehow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3373] [build] Shade away Hadoop's HTTP ...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1615 [FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency This makes the HTTP Components dependency disappear from the core classpath, allowing users to use their own version of the dependency. We need shading because we cannot simply bump the HTTP Components version to the newest version. The YARN test for Hadoop version >= 2.6.0 fail in that case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink http_shade Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1615.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1615 commit 1be39d12071c7251cd566e692c3a9c7b5440e46d Author: Stephan Ewen Date: 2016-02-09T20:18:43Z [FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139668#comment-15139668 ] Greg Hogan commented on FLINK-: --- I can think of four return values from a user defined {{reduce()}}: left, right, new object, or a long-lived local object. The long-lived user object can be later modified by the user, but storing and later modifying the left or right input objects is unsafe. That is what I consider an edge case, an action that no user is expected to perform, and that cannot be prevented without impacting performance (a.k.a. disabling object reuse). > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139151#comment-15139151 ] Greg Hogan edited comment on FLINK- at 2/9/16 8:07 PM: --- Apache Flink programs can be written and configured to reduce the number of object allocations for better performance. User defined functions (like map() or groupReduce()) process many millions or billions of input and output values. Enabling object reuse and processing mutable objects improves performance by lowering demand on the CPU cache and Java garbage collector. Object reuse is disabled by default, with user defined functions generally getting new objects on each call (or through an iterator). In this case it is safe to store references to the objects inside the function (for example, in a List). <'storing values in a list' example> Apache Flink will chain functions to improve performance when sorting is preserved and the parallelism unchanged. The chainable operators are Map, FlatMap, Reduce on full DataSet, GroupCombine on a Grouped DataSet, or a GroupReduce where the user supplied a RichGroupReduceFunction with a combine method. Objects are passed without copying _even when object reuse is disabled_. In the chaining case, the functions in the chain are receiving the same object instances. So the the second map() function is receiving the objects the first map() is returning. This behavior can lead to errors when the first map() function keeps a list of all objects and the second mapper is modifying objects. In that case, the user has to manually create copies of the objects before putting them into the list. There is a switch at the ExecutionConfig which allows users to enable the object reuse mode (enableObjectReuse()). For mutable types, Flink will reuse object instances. In practice that means that a user function will always receive the same object instance (with its fields set to new values). The object reuse mode will lead to better performance because fewer objects are created, but the user has to manually take care of what they are doing with the object references. was (Author: greghogan): Apache Flink programs can be written and configured to reduce the number of object allocations for better performance. User defined functions (like map() or groupReduce()) process many millions or billions of input and output values. Enabling object reuse and processing mutable objects improves performance by lowering demand on the CPU cache and Java garbage collector. Object reuse is disabled by default, with user defined functions generally getting new objects on each call (or through an iterator). In this case it is safe to store references to the objects inside the function (for example, in a List). <'storing values in a list' example> Apache Flink will chain functions to improve performance when sorting is preserved and the parallelism unchanged. The chainable operators are Map, FlatMap, Reduce on full DataSet, GroupCombine on a Grouped DataSet, or a GroupReduce where the user supplied a RichGroupReduceFunction with a combine method). Objects are passed without copying _even when object reuse is disabled_. In the chaining case, the functions in the chain are receiving the same object instances. So the the second map() function is receiving the objects the first map() is returning. This behavior can lead to errors when the first map() function keeps a list of all objects and the second mapper is modifying objects. In that case, the user has to manually create copies of the objects before putting them into the list. There is a switch at the ExecutionConfig which allows users to enable the object reuse mode (enableObjectReuse()). For mutable types, Flink will reuse object instances. In practice that means that a user function will always receive the same object instance (with its fields set to new values). The object reuse mode will lead to better performance because fewer objects are created, but the user has to manually take care of what they are doing with the object references. > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This messa
[jira] [Closed] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
[ https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-3382. - Resolution: Not A Problem Per the comment on the pull request, this change would interfere with proper use of the iterator. > Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper > - > > Key: FLINK-3382 > URL: https://issues.apache.org/jira/browse/FLINK-3382 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be > clarified by creating a single object and storing the iterator's next value > into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
[ https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139653#comment-15139653 ] ASF GitHub Bot commented on FLINK-3382: --- Github user greghogan closed the pull request at: https://github.com/apache/flink/pull/1614 > Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper > - > > Key: FLINK-3382 > URL: https://issues.apache.org/jira/browse/FLINK-3382 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be > clarified by creating a single object and storing the iterator's next value > into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1614#issuecomment-182035391 Ah, yes, now I see. I'll just burn this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
[ https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139652#comment-15139652 ] ASF GitHub Bot commented on FLINK-3382: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1614#issuecomment-182035391 Ah, yes, now I see. I'll just burn this. > Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper > - > > Key: FLINK-3382 > URL: https://issues.apache.org/jira/browse/FLINK-3382 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be > clarified by creating a single object and storing the iterator's next value > into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...
Github user greghogan closed the pull request at: https://github.com/apache/flink/pull/1614 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1614#issuecomment-182025617 Shouldn't the current record remain valid if `hasNext()` returned true? I mean the user might be holding on to the object returned in `next`, and expect it to not be changed by a `hasNext` call: ``` T cur = it.next(); if(it.hasNext()) { // here, I would expect cur to not have changed since the next() call } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
[ https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139530#comment-15139530 ] ASF GitHub Bot commented on FLINK-3382: --- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/1614#issuecomment-182025617 Shouldn't the current record remain valid if `hasNext()` returned true? I mean the user might be holding on to the object returned in `next`, and expect it to not be changed by a `hasNext` call: ``` T cur = it.next(); if(it.hasNext()) { // here, I would expect cur to not have changed since the next() call } ``` > Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper > - > > Key: FLINK-3382 > URL: https://issues.apache.org/jira/browse/FLINK-3382 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be > clarified by creating a single object and storing the iterator's next value > into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
[ https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139441#comment-15139441 ] ASF GitHub Bot commented on FLINK-3382: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1614 [FLINK-3382] Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3382_improve_clarity_of_object_reuse_in_ReusingMutableToRegularIteratorWrapper Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1614 > Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper > - > > Key: FLINK-3382 > URL: https://issues.apache.org/jira/browse/FLINK-3382 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be > clarified by creating a single object and storing the iterator's next value > into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3382] Improve clarity of object reuse i...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1614 [FLINK-3382] Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3382_improve_clarity_of_object_reuse_in_ReusingMutableToRegularIteratorWrapper Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1614 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139435#comment-15139435 ] Gabor Gevay commented on FLINK-: One more thing: if we decide to go with including chaining, then please also explain that a "chainable operator" means that it can be chained with the _previous_ operator. > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139415#comment-15139415 ] ASF GitHub Bot commented on FLINK-2523: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52356401 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); } /** -* Constructs a new job graph with the given name and a random job ID if null supplied as an id. +* Constructs a new job graph with the given name, the given {@link ExecutionConfig}, +* and a random job ID. +* +* @param jobName The name of the job. +* @param config The execution configuration of the job. +*/ + public JobGraph(String jobName, ExecutionConfig config) { + this(null, jobName, config); + } + + /** +* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), +* the given name and the given execution configuration (see {@link ExecutionConfig}). * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. +* @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName) { + public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.executionConfig = config; --- End diff -- Can `executionConfig` be `null`? If not, then we should insert a check here. > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1612#discussion_r52356401 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -102,60 +101,150 @@ // /** -* Constructs a new job graph with no name and a random job ID. +* Constructs a new job graph with no name, a random job ID, and the default +* {@link ExecutionConfig} parameters. */ public JobGraph() { this((String) null); } /** -* Constructs a new job graph with the given name, a random job ID. +* Constructs a new job graph with the given name, a random job ID and the default +* {@link ExecutionConfig} parameters. * * @param jobName The name of the job */ public JobGraph(String jobName) { - this(null, jobName); + this(null, jobName, new ExecutionConfig()); + } + + /** +* Constructs a new job graph with the given job ID, the given name, and the default +* {@link ExecutionConfig} parameters. +* +* @param jobID The id of the job. A random ID is generated, if {@code null} is passed. +* @param jobName The name of the job. +*/ + public JobGraph(JobID jobID, String jobName) { + this(jobID, jobName, new ExecutionConfig()); } /** -* Constructs a new job graph with the given name and a random job ID if null supplied as an id. +* Constructs a new job graph with the given name, the given {@link ExecutionConfig}, +* and a random job ID. +* +* @param jobName The name of the job. +* @param config The execution configuration of the job. +*/ + public JobGraph(String jobName, ExecutionConfig config) { + this(null, jobName, config); + } + + /** +* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed), +* the given name and the given execution configuration (see {@link ExecutionConfig}). * * @param jobId The id of the job. A random ID is generated, if {@code null} is passed. * @param jobName The name of the job. +* @param config The execution configuration of the job. */ - public JobGraph(JobID jobId, String jobName) { + public JobGraph(JobID jobId, String jobName, ExecutionConfig config) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.executionConfig = config; --- End diff -- Can `executionConfig` be `null`? If not, then we should insert a check here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
[ https://issues.apache.org/jira/browse/FLINK-3382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3382: -- Description: Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be clarified by creating a single object and storing the iterator's next value into the second reference. (was: Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()} can be clarified by creating a single object and storing the iterator's next value into the second reference.) > Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper > - > > Key: FLINK-3382 > URL: https://issues.apache.org/jira/browse/FLINK-3382 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()}} can be > clarified by creating a single object and storing the iterator's next value > into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3335) Fix DataSourceTask object reuse when disabled
[ https://issues.apache.org/jira/browse/FLINK-3335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3335: -- Summary: Fix DataSourceTask object reuse when disabled (was: DataSourceTask object reuse when disabled) > Fix DataSourceTask object reuse when disabled > - > > Key: FLINK-3335 > URL: https://issues.apache.org/jira/browse/FLINK-3335 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > From {{DataSourceTask.invoke()}}: > {code} > if ((returned = format.nextRecord(serializer.createInstance())) != null) { > output.collect(returned); > } > {code} > The returned value ({{returned}}) must be copied rather than creating and > passing in a new instance. The {{InputFormat}} interface only permits the > given object to be used and does not require a new object to be returned > otherwise. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139332#comment-15139332 ] ASF GitHub Bot commented on FLINK-3226: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52349128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +abstract class AvgAggregate[T] extends Aggregate[T] { + +} + +// TinyInt average aggregate return Int as aggregated value. +class TinyIntAvgAggregate extends AvgAggregate[Byte] { + private var sum: Long = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { +sum = 0 +count = 0 + } + + override def aggregate(value: Any): Unit = { +count += 1 +sum += value.asInstanceOf[Byte] --- End diff -- What about simply adding `0.5`? > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1600#discussion_r52349128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate + +abstract class AvgAggregate[T] extends Aggregate[T] { + +} + +// TinyInt average aggregate return Int as aggregated value. +class TinyIntAvgAggregate extends AvgAggregate[Byte] { + private var sum: Long = 0 + private var count: Int = 0 + + override def initiateAggregate: Unit = { +sum = 0 +count = 0 + } + + override def aggregate(value: Any): Unit = { +count += 1 +sum += value.asInstanceOf[Byte] --- End diff -- What about simply adding `0.5`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139297#comment-15139297 ] ASF GitHub Bot commented on FLINK-3187: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1470#issuecomment-181981959 If nobody objects, then I would like to merge this PR, since it will give us more flexibility in the future with respect to restarting strategies. > Decouple restart strategy from ExecutionGraph > - > > Key: FLINK-3187 > URL: https://issues.apache.org/jira/browse/FLINK-3187 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Currently, the {{ExecutionGraph}} supports the following restart logic: > Whenever a failure occurs and the number of restart attempts aren't depleted, > wait for a fixed amount of time and then try to restart. This behaviour can > be controlled by the configuration parameters {{execution-retries.default}} > and {{execution-retries.delay}}. > I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by > introducing a strategy pattern. That way it would not only allow us to define > a job specific restart behaviour but also to implement different restart > strategies. Conceivable strategies could be: Fixed timeout restart, > exponential backoff restart, partial topology restarts, etc. > This change is a preliminary step towards having a restart strategy which > will scale the parallelism of a job down in case that not enough slots are > available. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3187] Introduce RestartStrategy to deco...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1470#issuecomment-181981959 If nobody objects, then I would like to merge this PR, since it will give us more flexibility in the future with respect to restarting strategies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3260] [runtime] Enforce terminal state ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1613 [FLINK-3260] [runtime] Enforce terminal state of Executions This commit fixes the problem that Executions could leave their terminal state FINISHED to transition to FAILED. Such a transition will be propagated to the ExecutionGraph where it entails JobStatus changes. Since the Execution already reached a terminal state, it should not again affect the ExecutionGraph. This can lead to an inconsistent state in case of a restart where the old Executions get disassociated from the ExecutionGraph. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCallbacks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1613.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1613 commit cb2a2fb6d3dde5e248e6153e849c8f07d241a10d Author: Till Rohrmann Date: 2016-02-09T09:30:12Z [FLINK-3260] [runtime] Enforce terminal state of Executions This commit fixes the problem that Executions could leave their terminal state FINISHED to transition to FAILED. Such a transition will be propagated to the ExecutionGraph where it entails JobStatus changes. Since the Execution already reached a terminal state, it should not again affect the ExecutionGraph. This can lead to an inconsistent state in case of a restart where the old Executions get disassociated from the ExecutionGraph. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3260) ExecutionGraph gets stuck in state FAILING
[ https://issues.apache.org/jira/browse/FLINK-3260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139293#comment-15139293 ] ASF GitHub Bot commented on FLINK-3260: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1613 [FLINK-3260] [runtime] Enforce terminal state of Executions This commit fixes the problem that Executions could leave their terminal state FINISHED to transition to FAILED. Such a transition will be propagated to the ExecutionGraph where it entails JobStatus changes. Since the Execution already reached a terminal state, it should not again affect the ExecutionGraph. This can lead to an inconsistent state in case of a restart where the old Executions get disassociated from the ExecutionGraph. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCallbacks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1613.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1613 commit cb2a2fb6d3dde5e248e6153e849c8f07d241a10d Author: Till Rohrmann Date: 2016-02-09T09:30:12Z [FLINK-3260] [runtime] Enforce terminal state of Executions This commit fixes the problem that Executions could leave their terminal state FINISHED to transition to FAILED. Such a transition will be propagated to the ExecutionGraph where it entails JobStatus changes. Since the Execution already reached a terminal state, it should not again affect the ExecutionGraph. This can lead to an inconsistent state in case of a restart where the old Executions get disassociated from the ExecutionGraph. > ExecutionGraph gets stuck in state FAILING > -- > > Key: FLINK-3260 > URL: https://issues.apache.org/jira/browse/FLINK-3260 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Blocker > Fix For: 1.0.0 > > > It is a bit of a rare case, but the following can currently happen: > 1. Jobs runs for a while, some tasks are already finished. > 2. Job fails, goes to state failing and restarting. Non-finished tasks fail > or are canceled. > 3. For the finished tasks, ask-futures from certain messages (for example > for releasing intermediate result partitions) can fail (timeout) and cause > the execution to go from FINISHED to FAILED > 4. This triggers the execution graph to go to FAILING without ever going > further into RESTARTING again > 5. The job is stuck > It initially looks like this is mainly an issue for batch jobs (jobs where > tasks do finish, rather than run infinitely). > The log that shows how this manifests: > {code} > > 17:19:19,782 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 17:19:19,844 INFO Remoting >- Starting remoting > 17:19:20,065 INFO Remoting >- Remoting started; listening on addresses > :[akka.tcp://flink@127.0.0.1:56722] > 17:19:20,090 INFO org.apache.flink.runtime.blob.BlobServer >- Created BLOB server storage directory > /tmp/blobStore-6766f51a-1c51-4a03-acfb-08c2c29c11f0 > 17:19:20,096 INFO org.apache.flink.runtime.blob.BlobServer >- Started BLOB server at 0.0.0.0:43327 - max concurrent requests: 50 - max > backlog: 1000 > 17:19:20,113 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist >- Started memory archivist akka://flink/user/archive > 17:19:20,115 INFO org.apache.flink.runtime.checkpoint.SavepointStoreFactory >- No savepoint state backend configured. Using job manager savepoint state > backend. > 17:19:20,118 INFO org.apache.flink.runtime.jobmanager.JobManager >- Starting JobManager at akka.tcp://flink@127.0.0.1:56722/user/jobmanager. > 17:19:20,123 INFO org.apache.flink.runtime.jobmanager.JobManager >- JobManager akka.tcp://flink@127.0.0.1:56722/user/jobmanager was granted > leadership with leader session ID None. > 17:19:25,605 INFO org.apache.flink.runtime.instance.InstanceManager >- Registered TaskManager at > testing-worker-linux-docker-e6d6931f-3200-linux-4 > (akka.tcp://flink@172.17.0.253:43702/user/taskmanager) as > f213232054587f296a12140d56f63ed1. Current number of registered hosts is 1. > Current number of alive task slots is 2. > 17:1
[jira] [Commented] (FLINK-2523) Make task canceling interrupt interval configurable
[ https://issues.apache.org/jira/browse/FLINK-2523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139279#comment-15139279 ] ASF GitHub Bot commented on FLINK-2523: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/1612 FLINK-2523: Makes the task cancellation interval configurable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink task_cancellation_interval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1612.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1612 commit cd36d7ec883e69828bcb476d69aba465dca79b8d Author: Aljoscha Krettek Date: 2016-02-03T10:07:38Z [hotfix] Fix typos in Trigger.java commit fdf74f036a96708fae7b8c8a5a2ce041bb7ed20f Author: Kostas Kloudas Date: 2016-02-03T12:58:12Z FLINK-3327: Attaches the ExecutionConfig to the JobGraph and propagates it to the Task itself. commit 5ead1c05215f4fd70f55370f7864674d670b1282 Author: Kostas Kloudas Date: 2016-02-09T14:48:00Z FLINK-2523: Makes the task cancellation interval configurable through the ExecutionConfig. > Make task canceling interrupt interval configurable > --- > > Key: FLINK-2523 > URL: https://issues.apache.org/jira/browse/FLINK-2523 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Klou > Fix For: 1.0.0 > > > When a task is canceled, the cancellation calls periodically "interrupt()" on > the task thread, if the task thread does not cancel with a certain time. > Currently, this value is hard coded to 10 seconds. We should make that time > configurable. > Until then, I would like to increase the value to 30 seconds, as many tasks > (here I am observing it for Kafka consumers) can take longer then 10 seconds > for proper cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2523: Makes the task cancellation interv...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/1612 FLINK-2523: Makes the task cancellation interval configurable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink task_cancellation_interval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1612.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1612 commit cd36d7ec883e69828bcb476d69aba465dca79b8d Author: Aljoscha Krettek Date: 2016-02-03T10:07:38Z [hotfix] Fix typos in Trigger.java commit fdf74f036a96708fae7b8c8a5a2ce041bb7ed20f Author: Kostas Kloudas Date: 2016-02-03T12:58:12Z FLINK-3327: Attaches the ExecutionConfig to the JobGraph and propagates it to the Task itself. commit 5ead1c05215f4fd70f55370f7864674d670b1282 Author: Kostas Kloudas Date: 2016-02-09T14:48:00Z FLINK-2523: Makes the task cancellation interval configurable through the ExecutionConfig. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3383) Separate Maven deployment from CI testing
Maximilian Michels created FLINK-3383: - Summary: Separate Maven deployment from CI testing Key: FLINK-3383 URL: https://issues.apache.org/jira/browse/FLINK-3383 Project: Flink Issue Type: Improvement Components: Build System, Tests Affects Versions: 1.0.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Critical We currently handle our tests and the deployment of the Maven artifacts via Travis CI. Travis has a maximum allowed build time of two hours which we reach nearly every time. By that time, the tests have already been run but the deployment is still undergoing. I propose to remove the Maven deployment from Travis. Instead, we could use Apache's Jenkins service or Apache's Buildbot service to trigger a deployment once a day. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139219#comment-15139219 ] Gabor Gevay commented on FLINK-: OK, this would also be a clear improvement over the current documentation. However, this doesn't discuss the rules about whether I can modify output objects after returning them. This question is far from being trivial (as evidenced by the comment thread in the Google Doc). Do you think that this "edge case" is too insignificant to mention? > I think it is most important that the user documentation be clear and > concise, otherwise it won't be read or will discourage new users. This is why I think that the separation between the non-chained/chained cases should be left out of this. If we are aiming for simplicity here, then I really can't imagine a user meticulously checking whether his operator will be chained, and then writing different code based on this. Other minor issues: > In practice that means that a user function will always receive the same > object instance "it can happen that" should be inserted. See point C) in the "Concrete problems with the current documentation" section in the Google Doc. > User defined functions (like map() or groupReduce()) Should be "like MapFunction or GroupReduceFunction", to avoid confusing new users about what is a user defined function. > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1595#issuecomment-181955939 Thanks for the quick update @twalthr! Some tests are failing because the wrong type of exception is expected. I'll fix those and then merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
[ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139202#comment-15139202 ] ASF GitHub Bot commented on FLINK-3226: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1595#issuecomment-181955939 Thanks for the quick update @twalthr! Some tests are failing because the wrong type of exception is expected. I'll fix those and then merge this. > Translate optimized logical Table API plans into physical plans representing > DataSet programs > - > > Key: FLINK-3226 > URL: https://issues.apache.org/jira/browse/FLINK-3226 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Fabian Hueske >Assignee: Chengxiang Li > > This issue is about translating an (optimized) logical Table API (see > FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 > representation of the DataSet program that will be executed. This means: > - Each Flink RelNode refers to exactly one Flink DataSet or DataStream > operator. > - All (join and grouping) keys of Flink operators are correctly specified. > - The expressions which are to be executed in user-code are identified. > - All fields are referenced with their physical execution-time index. > - Flink type information is available. > - Optional: Add physical execution hints for joins > The translation should be the final part of Calcite's optimization process. > For this task we need to: > - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one > Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all > relevant operator information (keys, user-code expression, strategy hints, > parallelism). > - implement rules to translate optimized Calcite RelNodes into Flink > RelNodes. We start with a straight-forward mapping and later add rules that > merge several relational operators into a single Flink operator, e.g., merge > a join followed by a filter. Timo implemented some rules for the first SQL > implementation which can be used as a starting point. > - Integrate the translation rules into the Calcite optimization process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase
[ https://issues.apache.org/jira/browse/FLINK-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Klou updated FLINK-3380: Description: https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt (was: {quote} Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.881 sec <<< FAILURE! - in org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase Time elapsed: 13.04 sec <<< FAILURE! java.lang.AssertionError: Futures timed out after [1 milliseconds] at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase.teardown(JobSubmissionFailsITCase.java:82) {quote}) > Unstable Test: JobSubmissionFailsITCase > --- > > Key: FLINK-3380 > URL: https://issues.apache.org/jira/browse/FLINK-3380 > Project: Flink > Issue Type: Bug >Reporter: Klou >Priority: Critical > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52336491 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- The `jetty-util` will actually be included in the Flink fat jar without this line. This line simply adds the `jetty-util` jar in addition into the `flink-shaded-hadoop1.jar` which is an intermediate artifact, but also deployed to maven. I would like to keep the dependency out of the intermediate artifact. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception
[ https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139192#comment-15139192 ] ASF GitHub Bot commented on FLINK-3271: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52336491 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- The `jetty-util` will actually be included in the Flink fat jar without this line. This line simply adds the `jetty-util` jar in addition into the `flink-shaded-hadoop1.jar` which is an intermediate artifact, but also deployed to maven. I would like to keep the dependency out of the intermediate artifact. > Using webhdfs in a flink topology throws classnotfound exception > > > Key: FLINK-3271 > URL: https://issues.apache.org/jira/browse/FLINK-3271 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 0.10.1 >Reporter: Abhishek Agarwal >Assignee: Abhishek Agarwal > > I was just trying to run a storm topology on flink using flink-storm. I got > this exception - > {noformat} > Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277) > {noformat} > My topology list some files on hdfs using webhdfs API. > org.mortbay.util.ajax.JSON was included in the application uber jar. I > noticed that flink loads the application jar in a child classloader. This is > what most likely happened - > 1. WebHdfsFileSystem class was loaded through parent class loader since it is > included in flink-dist.jar. > 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but > since it is loaded through parent class loader, WebHdfsFileSystem can't read > a class in child class loader. > Ideally all the referenced classes should be available in the distribution > jar so that these sort of issues may not occur. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3382) Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper
Greg Hogan created FLINK-3382: - Summary: Improve clarity of object reuse in ReusingMutableToRegularIteratorWrapper Key: FLINK-3382 URL: https://issues.apache.org/jira/browse/FLINK-3382 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 1.0.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Minor Object reuse in {{ReusingMutableToRegularIteratorWrapper.hasNext()} can be clarified by creating a single object and storing the iterator's next value into the second reference. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3381) Unstable Test: JobManagerSubmittedJobGraphsRecoveryITCase
Klou created FLINK-3381: --- Summary: Unstable Test: JobManagerSubmittedJobGraphsRecoveryITCase Key: FLINK-3381 URL: https://issues.apache.org/jira/browse/FLINK-3381 Project: Flink Issue Type: Bug Reporter: Klou Priority: Critical https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase
[ https://issues.apache.org/jira/browse/FLINK-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139190#comment-15139190 ] Ufuk Celebi commented on FLINK-3380: These should be fixed with https://github.com/apache/flink/pull/1611. > Unstable Test: JobSubmissionFailsITCase > --- > > Key: FLINK-3380 > URL: https://issues.apache.org/jira/browse/FLINK-3380 > Project: Flink > Issue Type: Bug >Reporter: Klou >Priority: Critical > > {quote} > Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.881 sec > <<< FAILURE! - in > org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase > org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase Time elapsed: > 13.04 sec <<< FAILURE! > java.lang.AssertionError: Futures timed out after [1 milliseconds] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase.teardown(JobSubmissionFailsITCase.java:82) > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase
Klou created FLINK-3380: --- Summary: Unstable Test: JobSubmissionFailsITCase Key: FLINK-3380 URL: https://issues.apache.org/jira/browse/FLINK-3380 Project: Flink Issue Type: Bug Reporter: Klou Priority: Critical {quote} Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.881 sec <<< FAILURE! - in org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase Time elapsed: 13.04 sec <<< FAILURE! java.lang.AssertionError: Futures timed out after [1 milliseconds] at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.test.failingPrograms.JobSubmissionFailsITCase.teardown(JobSubmissionFailsITCase.java:82) {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3375) Allow Watermark Generation in the Kafka Source
[ https://issues.apache.org/jira/browse/FLINK-3375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139159#comment-15139159 ] Zach Cox commented on FLINK-3375: - Like [~shikhar] I will also have event producers on different machines (with independent clocks) sending messages to the same Kafka topic partitions. So events from the same producer are ordered, but in general events in each partition are somewhat out-of-order. I have full control over these producers, and have considered having them emit periodic watermarks. Would be nice to also have the option for the FlinkKafkaConsumer to use watermarks embedded directly in the Kafka topic, instead of just trying to extract them from events. > Allow Watermark Generation in the Kafka Source > -- > > Key: FLINK-3375 > URL: https://issues.apache.org/jira/browse/FLINK-3375 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.0 >Reporter: Stephan Ewen > Fix For: 1.0.0 > > > It is a common case that event timestamps are ascending inside one Kafka > Partition. Ascending timestamps are easy for users, because they are handles > by ascending timestamp extraction. > If the Kafka source has multiple partitions per source task, then the records > become out of order before timestamps can be extracted and watermarks can be > generated. > If we make the FlinkKafkaConsumer an event time source function, it can > generate watermarks itself. It would internally implement the same logic as > the regular operators that merge streams, keeping track of event time > progress per partition and generating watermarks based on the current > guaranteed event time progress. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139151#comment-15139151 ] Greg Hogan commented on FLINK-: --- Apache Flink programs can be written and configured to reduce the number of object allocations for better performance. User defined functions (like map() or groupReduce()) process many millions or billions of input and output values. Enabling object reuse and processing mutable objects improves performance by lowering demand on the CPU cache and Java garbage collector. Object reuse is disabled by default, with user defined functions generally getting new objects on each call (or through an iterator). In this case it is safe to store references to the objects inside the function (for example, in a List). <'storing values in a list' example> Apache Flink will chain functions to improve performance when sorting is preserved and the parallelism unchanged. The chainable operators are Map, FlatMap, Reduce on full DataSet, GroupCombine on a Grouped DataSet, or a GroupReduce where the user supplied a RichGroupReduceFunction with a combine method). Objects are passed without copying _even when object reuse is disabled_. In the chaining case, the functions in the chain are receiving the same object instances. So the the second map() function is receiving the objects the first map() is returning. This behavior can lead to errors when the first map() function keeps a list of all objects and the second mapper is modifying objects. In that case, the user has to manually create copies of the objects before putting them into the list. There is a switch at the ExecutionConfig which allows users to enable the object reuse mode (enableObjectReuse()). For mutable types, Flink will reuse object instances. In practice that means that a user function will always receive the same object instance (with its fields set to new values). The object reuse mode will lead to better performance because fewer objects are created, but the user has to manually take care of what they are doing with the object references. > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3333) Documentation about object reuse should be improved
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139150#comment-15139150 ] Greg Hogan commented on FLINK-: --- I think it is most important that the user documentation be clear and concise, otherwise it won't be read or will discourage new users. Whatever edge cases may lurk, object reuse appears to have worked well as users naturally write code. It would be helpful to include a list of chainable operators (from DriverStrategy.java: Map, FlatMap, Reduce on full DataSet, GroupCombine on a Grouped DataSet, or a GroupReduce where the user supplied a RichGroupReduceFunction with a combine method). Those could be linked to the DataSet Transformations page. Some discussion of CopyableValue types such as IntValue, LongValue, and StringValue would be nice, along with code samples. I've typed up some edits and will paste into a new comment. I do think that discussion of user prohibitions (no modifying inputs to a filter function, no modifying keyed fields, ...) would be better placed in a separate section since those prohibitions are universally applicable. > Documentation about object reuse should be improved > --- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Blocker > Fix For: 1.0.0 > > > The documentation about object reuse \[1\] has several problems, see \[2\]. > \[1\] > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > \[2\] > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar
Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52330385 --- Diff: flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml --- @@ -184,10 +184,10 @@ under the License. org.mortbay.jetty jsp-2.1 - +
[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception
[ https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139132#comment-15139132 ] ASF GitHub Bot commented on FLINK-3271: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52330385 --- Diff: flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml --- @@ -184,10 +184,10 @@ under the License. org.mortbay.jetty jsp-2.1 - + > Key: FLINK-3271 > URL: https://issues.apache.org/jira/browse/FLINK-3271 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 0.10.1 >Reporter: Abhishek Agarwal >Assignee: Abhishek Agarwal > > I was just trying to run a storm topology on flink using flink-storm. I got > this exception - > {noformat} > Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277) > {noformat} > My topology list some files on hdfs using webhdfs API. > org.mortbay.util.ajax.JSON was included in the application uber jar. I > noticed that flink loads the application jar in a child classloader. This is > what most likely happened - > 1. WebHdfsFileSystem class was loaded through parent class loader since it is > included in flink-dist.jar. > 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but > since it is loaded through parent class loader, WebHdfsFileSystem can't read > a class in child class loader. > Ideally all the referenced classes should be available in the distribution > jar so that these sort of issues may not occur. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar
Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52330359 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- It is a point solution only for webhdfs API. In code path of webhdfs API, hadoop classes call jetty-util classes. Since jetty-utils classes are not available in "flink class loader", call fails. Even if I pack jetty-util classes in my application jar, call still fails since application jar is in "different classloader". There are two ways to solve - 1. Relocate hadoop classes in flink fat jar. Now the webhdfs call will go through hadoop classes packed in application jar, and not the flink jar. 2. Include jetty-util classes in flink fat jar. This is what I am doing in this PR. Hope I have clarified the problem --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception
[ https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139131#comment-15139131 ] ASF GitHub Bot commented on FLINK-3271: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52330359 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- It is a point solution only for webhdfs API. In code path of webhdfs API, hadoop classes call jetty-util classes. Since jetty-utils classes are not available in "flink class loader", call fails. Even if I pack jetty-util classes in my application jar, call still fails since application jar is in "different classloader". There are two ways to solve - 1. Relocate hadoop classes in flink fat jar. Now the webhdfs call will go through hadoop classes packed in application jar, and not the flink jar. 2. Include jetty-util classes in flink fat jar. This is what I am doing in this PR. Hope I have clarified the problem > Using webhdfs in a flink topology throws classnotfound exception > > > Key: FLINK-3271 > URL: https://issues.apache.org/jira/browse/FLINK-3271 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 0.10.1 >Reporter: Abhishek Agarwal >Assignee: Abhishek Agarwal > > I was just trying to run a storm topology on flink using flink-storm. I got > this exception - > {noformat} > Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277) > {noformat} > My topology list some files on hdfs using webhdfs API. > org.mortbay.util.ajax.JSON was included in the application uber jar. I > noticed that flink loads the application jar in a child classloader. This is > what most likely happened - > 1. WebHdfsFileSystem class was loaded through parent class loader since it is > included in flink-dist.jar. > 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but > since it is loaded through parent class loader, WebHdfsFileSystem can't read > a class in child class loader. > Ideally all the referenced classes should be available in the distribution > jar so that these sort of issues may not occur. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3356) JobClientActorRecoveryITCase.testJobClientRecovery
[ https://issues.apache.org/jira/browse/FLINK-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139116#comment-15139116 ] Klou edited comment on FLINK-3356 at 2/9/16 4:14 PM: - Although it does not refer to the same test, I post it here, as the reason seems to be related to timeouts. So here it goes, more unstable tests: https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt was (Author: kkl0u): More unstable tests (timeouts seem to be the reason) https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt > JobClientActorRecoveryITCase.testJobClientRecovery > -- > > Key: FLINK-3356 > URL: https://issues.apache.org/jira/browse/FLINK-3356 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/107597706 > https://travis-ci.org/mjsax/flink/jobs/107597700 > {noformat} > Tests in error: > JobClientActorRecoveryITCase.testJobClientRecovery:135 » Timeout Futures > timed... > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3105) Submission in per job YARN cluster mode reuses properties file of long-lived session
[ https://issues.apache.org/jira/browse/FLINK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139129#comment-15139129 ] Ufuk Celebi commented on FLINK-3105: Do we want to fix this for 1.0? > Submission in per job YARN cluster mode reuses properties file of long-lived > session > > > Key: FLINK-3105 > URL: https://issues.apache.org/jira/browse/FLINK-3105 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 0.10.1 >Reporter: Ufuk Celebi > > Starting a YARN session with `bin/yarn-session.sh` creates a properties file, > which is used to parse job manager information when submitting jobs. > This properties file is also used when submitting a job with {{bin/flink run > -m yarn-cluster}}. The {{yarn-cluster}} mode should actually start a new YARN > session. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3105) Submission in per job YARN cluster mode reuses properties file of long-lived session
[ https://issues.apache.org/jira/browse/FLINK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-3105: --- Fix Version/s: (was: 0.10.2) > Submission in per job YARN cluster mode reuses properties file of long-lived > session > > > Key: FLINK-3105 > URL: https://issues.apache.org/jira/browse/FLINK-3105 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 0.10.1 >Reporter: Ufuk Celebi > > Starting a YARN session with `bin/yarn-session.sh` creates a properties file, > which is used to parse job manager information when submitting jobs. > This properties file is also used when submitting a job with {{bin/flink run > -m yarn-cluster}}. The {{yarn-cluster}} mode should actually start a new YARN > session. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Increase test timeouts
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/1611 Increase test timeouts - Increases Akka ask timeout in test clusters. Because there is no clear hierarchy with these, I added a `setDefaultCiConfig(Configuration` method to `FlinkMiniCluster`, which is called in the `generateConfiguration(Configuration)` of all sub types. - Increases the ZooKeeper connection timeouts - Logs failures in the retry rule on warn level instead of debug - Fixes a test instability in `JobManagerCheckpointRecoveryITCase` Running 10 builds with these changes, there were still some failures (mostly process test failures). Another new issue seems to be that we are started hitting the 2 h build limit occasionally. We will still have to monitor the build stability. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink test-stability Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1611 commit 5c062101d487f0950ae1b22eb4166bb2564d67e2 Author: Ufuk Celebi Date: 2016-02-08T13:24:43Z [runtime, streaming-connectors, tests] Increase default test Akka ask and ZooKeeper timeouts commit 73b6af7f5565576d440372f6650d467eae008b99 Author: Ufuk Celebi Date: 2016-02-09T10:01:39Z [runtime, tests] Ignore ZooKeeper logs in process tests commit 0854ff7decf3069f13489fcc8b6f88aec5fc5d94 Author: Ufuk Celebi Date: 2016-02-09T10:25:37Z [core] Log retry rule failures on warn level commit 1dc2b61ac93d29da47dbc98025493a052b1627fd Author: Ufuk Celebi Date: 2016-02-09T11:45:41Z [tests] Reset state to allow retry on failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3356) JobClientActorRecoveryITCase.testJobClientRecovery
[ https://issues.apache.org/jira/browse/FLINK-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139116#comment-15139116 ] Klou commented on FLINK-3356: - More unstable tests (timeouts seem to be the reason) https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034634/log.txt > JobClientActorRecoveryITCase.testJobClientRecovery > -- > > Key: FLINK-3356 > URL: https://issues.apache.org/jira/browse/FLINK-3356 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/107597706 > https://travis-ci.org/mjsax/flink/jobs/107597700 > {noformat} > Tests in error: > JobClientActorRecoveryITCase.testJobClientRecovery:135 » Timeout Futures > timed... > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception
[ https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139114#comment-15139114 ] ASF GitHub Bot commented on FLINK-3271: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52328751 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- This PR does not change that the classes are not hidden. It only puts them into the prepared Hadoop dependency for Flink (which later goes into the Flink fat jar). > Using webhdfs in a flink topology throws classnotfound exception > > > Key: FLINK-3271 > URL: https://issues.apache.org/jira/browse/FLINK-3271 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 0.10.1 >Reporter: Abhishek Agarwal >Assignee: Abhishek Agarwal > > I was just trying to run a storm topology on flink using flink-storm. I got > this exception - > {noformat} > Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277) > {noformat} > My topology list some files on hdfs using webhdfs API. > org.mortbay.util.ajax.JSON was included in the application uber jar. I > noticed that flink loads the application jar in a child classloader. This is > what most likely happened - > 1. WebHdfsFileSystem class was loaded through parent class loader since it is > included in flink-dist.jar. > 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but > since it is loaded through parent class loader, WebHdfsFileSystem can't read > a class in child class loader. > Ideally all the referenced classes should be available in the distribution > jar so that these sort of issues may not occur. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52328751 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- This PR does not change that the classes are not hidden. It only puts them into the prepared Hadoop dependency for Flink (which later goes into the Flink fat jar). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3271) Using webhdfs in a flink topology throws classnotfound exception
[ https://issues.apache.org/jira/browse/FLINK-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139108#comment-15139108 ] ASF GitHub Bot commented on FLINK-3271: --- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52327591 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- Hadoop classes are being included in the fat jar but are not being hidden. That leads to the error scenario, this PR intends to solve. > Using webhdfs in a flink topology throws classnotfound exception > > > Key: FLINK-3271 > URL: https://issues.apache.org/jira/browse/FLINK-3271 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 0.10.1 >Reporter: Abhishek Agarwal >Assignee: Abhishek Agarwal > > I was just trying to run a storm topology on flink using flink-storm. I got > this exception - > {noformat} > Caused by: java.lang.NoClassDefFoundError: org/mortbay/util/ajax/JSON > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.jsonParse(WebHdfsFileSystem.java:325) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathResponseRunner.getResponse(WebHdfsFileSystem.java:727) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.runWithRetry(WebHdfsFileSystem.java:610) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.access$100(WebHdfsFileSystem.java:458) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner$1.run(WebHdfsFileSystem.java:487) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem$AbstractRunner.run(WebHdfsFileSystem.java:483) > at > org.apache.hadoop.hdfs.web.WebHdfsFileSystem.listStatus(WebHdfsFileSystem.java:1277) > {noformat} > My topology list some files on hdfs using webhdfs API. > org.mortbay.util.ajax.JSON was included in the application uber jar. I > noticed that flink loads the application jar in a child classloader. This is > what most likely happened - > 1. WebHdfsFileSystem class was loaded through parent class loader since it is > included in flink-dist.jar. > 2. WebHdfsFileSystem has reference to the org.mortbay.util.ajax.JSON but > since it is loaded through parent class loader, WebHdfsFileSystem can't read > a class in child class loader. > Ideally all the referenced classes should be available in the distribution > jar so that these sort of issues may not occur. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3271: Include jetty-util in the dist jar
Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/flink/pull/1543#discussion_r52327591 --- Diff: flink-shaded-hadoop/pom.xml --- @@ -111,6 +111,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.mortbay.jetty:* --- End diff -- Hadoop classes are being included in the fat jar but are not being hidden. That leads to the error scenario, this PR intends to solve. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---