[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34662511 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- Are we sure nothing changed there in the logic? We have spent so much time getting this delicate thing right, I am always extra careful when seeing changes there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34662919 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- I'm absolutely sure but I understand your worries. I'll exclude it from this pull request and open a separate pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34663165 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- If you are absolutely sure, it is fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34662347 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { + + protected static final Logger LOG = LoggerFactory.getLogger(AccumulatorRegistry.class); + + protected final JobID jobID; + protected final ExecutionAttemptID taskID; + + /* Flink's internal Accumulator values stored for the executing task. */ + private final MapMetric, Accumulator?, ? flinkAccumulators = + new HashMapMetric, Accumulator?, ?(); + + /* User-defined Accumulator values stored for the executing task. */ + private final MapString, Accumulator?, ? userAccumulators = + Collections.synchronizedMap(new HashMapString, Accumulator?, ?()); + + /* The reporter returned to reporting tasks. */ + private final ReadWriteReporter reporter; + + /** +* Flink metrics supported +*/ + public enum Metric { + NUM_RECORDS_IN, + NUM_RECORDS_OUT, + NUM_BYTES_IN, + NUM_BYTES_OUT + } + + + public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) { + this.jobID = jobID; + this.taskID = taskID; + this.reporter = new ReadWriteReporter(flinkAccumulators); + } + + /** +* Creates a snapshot of this accumulator registry. +* @return a serialized accumulator map +*/ + public AccumulatorSnapshot getSnapshot() { + try { + return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators); + } catch (IOException e) { + LOG.warn(Failed to serialize accumulators for task., e); + return null; + } + } + + /** +* Gets the map for user-defined accumulators. +*/ + public MapString, Accumulator?, ? getUserMap() { + return userAccumulators; + } + + /** +* Gets the reporter for flink internal metrics. +*/ + public Reporter getReadWriteReporter() { + return reporter; + } + + /** +* Interface for Flink's internal accumulators. +*/ + public interface Reporter { + void report(Metric metric, long value); + } + + /** +* Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out) +*/ + public static class ReadWriteReporter implements Reporter { + + private LongCounter numRecordsIn = new LongCounter(); + private LongCounter numRecordsOut = new LongCounter(); + private LongCounter numBytesIn = new LongCounter(); + private LongCounter numBytesOut = new LongCounter(); + + public ReadWriteReporter(MapMetric, Accumulator?,? accumulatorMap) { + accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn); + accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut); + accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn); +
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34665546 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- I've opened a separate pull request #914. We can discuss there whether the change is necessary or not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34674005 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { + + protected static final Logger LOG = LoggerFactory.getLogger(AccumulatorRegistry.class); + + protected final JobID jobID; + protected final ExecutionAttemptID taskID; + + /* Flink's internal Accumulator values stored for the executing task. */ + private final MapMetric, Accumulator?, ? flinkAccumulators = + new HashMapMetric, Accumulator?, ?(); + + /* User-defined Accumulator values stored for the executing task. */ + private final MapString, Accumulator?, ? userAccumulators = + Collections.synchronizedMap(new HashMapString, Accumulator?, ?()); + + /* The reporter returned to reporting tasks. */ + private final ReadWriteReporter reporter; + + /** +* Flink metrics supported +*/ + public enum Metric { + NUM_RECORDS_IN, + NUM_RECORDS_OUT, + NUM_BYTES_IN, + NUM_BYTES_OUT + } + + + public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) { + this.jobID = jobID; + this.taskID = taskID; + this.reporter = new ReadWriteReporter(flinkAccumulators); + } + + /** +* Creates a snapshot of this accumulator registry. +* @return a serialized accumulator map +*/ + public AccumulatorSnapshot getSnapshot() { + try { + return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators); + } catch (IOException e) { + LOG.warn(Failed to serialize accumulators for task., e); + return null; + } + } + + /** +* Gets the map for user-defined accumulators. +*/ + public MapString, Accumulator?, ? getUserMap() { + return userAccumulators; + } + + /** +* Gets the reporter for flink internal metrics. +*/ + public Reporter getReadWriteReporter() { + return reporter; + } + + /** +* Interface for Flink's internal accumulators. +*/ + public interface Reporter { + void report(Metric metric, long value); + } + + /** +* Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out) +*/ + public static class ReadWriteReporter implements Reporter { + + private LongCounter numRecordsIn = new LongCounter(); + private LongCounter numRecordsOut = new LongCounter(); + private LongCounter numBytesIn = new LongCounter(); + private LongCounter numBytesOut = new LongCounter(); + + public ReadWriteReporter(MapMetric, Accumulator?,? accumulatorMap) { + accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn); + accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut); + accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn); +
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/896 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-121605629 Thanks for the feedback @StephanEwen @uce. Will merge this once Travis passes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-121173161 Wasn't the rationale to have a separate message for the heartbeat and the accumulators to keep the heartbeat messages small? What do you think? In any case, I agree that it makes sense to be able to configure this. Initially, that was my proposal but I remember we discussed that we should try to minimize the amount of messages and thus use the `Heartbeat` message. It makes sense to be able to configure the snapshot transferal in multiples of the hearbeat interval. I'd like to fix that in another pull I would add a unit test for the record reader/writer accumulator results. I'll integrate that into the `AccumulatorLiveITCase`. There seems to be a 1:1 correspondence between the reporter and the internal metrics. Is the idea to have multiple different reporters in the future (for different types of metrics etc.) or is a long reporter sufficient? For now, a long reporter is sufficient but if we discover that we need more customizable reporters, we can easily change the reporter interface in the future (it wouldn't be API breaking). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34547975 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java --- @@ -408,14 +408,16 @@ public long count() throws Exception { JobExecutionResult res = getExecutionEnvironment().execute(); ArrayListbyte[] accResult = res.getAccumulatorResult(id); - try { - return SerializedListAccumulator.deserializeList(accResult, serializer); - } - catch (ClassNotFoundException e) { - throw new RuntimeException(Cannot find type class of collected data type., e); - } - catch (IOException e) { - throw new RuntimeException(Serialization error while deserializing collected data, e); + if (accResult != null) { + try { + return SerializedListAccumulator.deserializeList(accResult, serializer); + } catch (ClassNotFoundException e) { + throw new RuntimeException(Cannot find type class of collected data type., e); + } catch (IOException e) { + throw new RuntimeException(Serialization error while deserializing collected data, e); + } + } else { + throw new RuntimeException(The job result did not contain any accumulator data.); --- End diff -- Then let's change it to Collect returned no results. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34549555 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -400,13 +398,22 @@ class JobManager(protected val flinkConfiguration: Configuration, import scala.collection.JavaConverters._ sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) -case Heartbeat(instanceID, metricsReport) = - try { -log.debug(sReceived hearbeat message from $instanceID.) -instanceManager.reportHeartBeat(instanceID, metricsReport) - } catch { -case t: Throwable = log.error(sCould not report heart beat from ${sender().path}., t) - } +case Heartbeat(instanceID, metricsReport, accumulators) = + log.debug(sReceived hearbeat message from $instanceID.) + + Future { +accumulators foreach { + case accumulators = + currentJobs.get(accumulators.getJobID) match { --- End diff -- Yes that is true. Like you said such corner case wouldn't cause anything bad but it would log an error. So let's just do nothing in case of an unknown job id. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34552275 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -101,9 +101,9 @@ /** * For system internal usage only. Use getAccumulator(...) to obtain a -* accumulator. Use this as read-only. +* accumulator. The returned map must NOT be modified. */ - HashMapString, Accumulator?, ? getAllAccumulators(); + MapString, Accumulator?, ? getAllAccumulators(); --- End diff -- Changed the comment and deprecated the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34547621 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java --- @@ -53,32 +53,34 @@ private final ExecutionConfig executionConfig; - private final HashMapString, Accumulator?, ? accumulators = new HashMapString, Accumulator?, ?(); - + private final MapString, Accumulator?, ? accumulators; + private final DistributedCache distributedCache; - - + + public AbstractRuntimeUDFContext(String name, --- End diff -- Thanks. Will make the cpTasks the last argument. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34547538 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -101,9 +101,9 @@ /** * For system internal usage only. Use getAccumulator(...) to obtain a -* accumulator. Use this as read-only. +* accumulator. The returned map must NOT be modified. */ - HashMapString, Accumulator?, ? getAllAccumulators(); + MapString, Accumulator?, ? getAllAccumulators(); --- End diff -- No, this is a user-facing to get the Accumulator map in the RichFunction. I changed the return type to Map and I'm returning a Collections.immutableMap to prevent modifications. I think it makes sense that the user can get a list of registered accumulators and their current status (e.g. in chained operators). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34548050 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java --- @@ -18,32 +18,60 @@ package org.apache.flink.runtime.accumulators; -import java.io.IOException; -import java.util.Map; - -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.util.SerializedValue; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + /** - * This class encapsulates a map of accumulators for a single job. It is used + * This class encapsulates a map of accumulators for a single task. It is used * for the transfer from TaskManagers to the JobManager and from the JobManager * to the Client. */ -public class AccumulatorEvent extends SerializedValueMapString, Accumulator?, ? { +public class AccumulatorEvent implements Serializable { - private static final long serialVersionUID = 8965894516006882735L; + private static final long serialVersionUID = 42L; - /** JobID for the target job */ private final JobID jobID; + private final ExecutionAttemptID executionAttemptID; + private final MapAccumulatorRegistry.Metric, Accumulator?, ? flinkAccumulators; + private final SerializedValueMapString, Accumulator?, ? userAccumulators; - public AccumulatorEvent(JobID jobID, MapString, Accumulator?, ? accumulators) throws IOException { - super(accumulators); + public AccumulatorEvent(JobID jobID, ExecutionAttemptID executionAttemptID, --- End diff -- The class is used to transfer the accumulator results from the task manager to the job manager. So it is used in both runtime components. I agree we could give it a better name. I like `AccumulatorSnapshot`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34504410 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -400,13 +398,22 @@ class JobManager(protected val flinkConfiguration: Configuration, import scala.collection.JavaConverters._ sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) -case Heartbeat(instanceID, metricsReport) = - try { -log.debug(sReceived hearbeat message from $instanceID.) -instanceManager.reportHeartBeat(instanceID, metricsReport) - } catch { -case t: Throwable = log.error(sCould not report heart beat from ${sender().path}., t) - } +case Heartbeat(instanceID, metricsReport, accumulators) = + log.debug(sReceived hearbeat message from $instanceID.) + + Future { +accumulators foreach { + case accumulators = + currentJobs.get(accumulators.getJobID) match { --- End diff -- The map of current jobs is not thread-safe. I think there could be corner cases where an error is reported although nothing wrong happend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34503354 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -101,9 +101,9 @@ /** * For system internal usage only. Use getAccumulator(...) to obtain a -* accumulator. Use this as read-only. +* accumulator. The returned map must NOT be modified. */ - HashMapString, Accumulator?, ? getAllAccumulators(); + MapString, Accumulator?, ? getAllAccumulators(); --- End diff -- The comment says system-internal usage only, but I couldn't find any usages with IntelliJ. Should we remove this then? I guess that this has been replaced by the getSnapshot method of the registry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34504087 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java --- @@ -18,32 +18,60 @@ package org.apache.flink.runtime.accumulators; -import java.io.IOException; -import java.util.Map; - -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.util.SerializedValue; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + /** - * This class encapsulates a map of accumulators for a single job. It is used + * This class encapsulates a map of accumulators for a single task. It is used * for the transfer from TaskManagers to the JobManager and from the JobManager * to the Client. */ -public class AccumulatorEvent extends SerializedValueMapString, Accumulator?, ? { +public class AccumulatorEvent implements Serializable { - private static final long serialVersionUID = 8965894516006882735L; + private static final long serialVersionUID = 42L; - /** JobID for the target job */ private final JobID jobID; + private final ExecutionAttemptID executionAttemptID; + private final MapAccumulatorRegistry.Metric, Accumulator?, ? flinkAccumulators; + private final SerializedValueMapString, Accumulator?, ? userAccumulators; - public AccumulatorEvent(JobID jobID, MapString, Accumulator?, ? accumulators) throws IOException { - super(accumulators); + public AccumulatorEvent(JobID jobID, ExecutionAttemptID executionAttemptID, --- End diff -- Regarding the name of this class: I think the name implies a single entity and not a pair of maps etc. Maybe rename it to something along the lines of AccumulatorSnapshot? getSnapshot is the only method where this is used anyways, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34503752 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java --- @@ -408,14 +408,16 @@ public long count() throws Exception { JobExecutionResult res = getExecutionEnvironment().execute(); ArrayListbyte[] accResult = res.getAccumulatorResult(id); - try { - return SerializedListAccumulator.deserializeList(accResult, serializer); - } - catch (ClassNotFoundException e) { - throw new RuntimeException(Cannot find type class of collected data type., e); - } - catch (IOException e) { - throw new RuntimeException(Serialization error while deserializing collected data, e); + if (accResult != null) { + try { + return SerializedListAccumulator.deserializeList(accResult, serializer); + } catch (ClassNotFoundException e) { + throw new RuntimeException(Cannot find type class of collected data type., e); + } catch (IOException e) { + throw new RuntimeException(Serialization error while deserializing collected data, e); + } + } else { + throw new RuntimeException(The job result did not contain any accumulator data.); --- End diff -- The error message might be confusing to the user, because a user should not care that the count is implemented with accumulators. Maybe let's make it explicit in the message that this case is a bug and should not happen. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34503586 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java --- @@ -53,32 +53,34 @@ private final ExecutionConfig executionConfig; - private final HashMapString, Accumulator?, ? accumulators = new HashMapString, Accumulator?, ?(); - + private final MapString, Accumulator?, ? accumulators; + private final DistributedCache distributedCache; - - + + public AbstractRuntimeUDFContext(String name, --- End diff -- I think a common practice is to have optional arguments last, e.g. the cpTasks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34503896 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { --- End diff -- Regarding the name of this class: I would either make it explicit in the class-level comment that this is PER task or rename it to something along the lines of TaskAccumulatorRegistry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-121046784 Nice piece of work! I agree with Stephan's points. I think it was good to address them. :) I've added some minor comments inline. Regarding the high-level comments: - Wasn't the rationale to have a separate message for the heartbeat and the accumulators to keep the heartbeat messages small? What do you think? In any case, I agree that it makes sense to be able to configure this. - I would add a unit test for the record reader/writer accumulator results. - There seems to be a 1:1 correspondence between the reporter and the internal metrics. Is the idea to have multiple different reporters in the future (for different types of metrics etc.) or is a long reporter sufficient? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335824 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -131,6 +133,35 @@ private SerializedValueStateHandle? operatorState; + /* Lock for updating the accumulators atomically. */ --- End diff -- If you consider comment lines as empty (non-code) lines, then it follows the classes' style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335830 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -172,13 +173,20 @@ /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; - + /** The cache for user-defined files that the invokable requires */ private final FileCache fileCache; - + /** The gateway to the network stack, which handles inputs and produced results */ private final NetworkEnvironment network; + /** The registry of this task which enables live reporting of accumulators */ + private final AccumulatorRegistry accumulatorRegistry; --- End diff -- Yes absolutely. This is an artifact of a former design. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335963 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { --- End diff -- True. Originally, those two were a bit different. The API can be changed to be the same but I find keeping the two classes useful because it helps to differentiate acumulator context by type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335420 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1017,6 +1040,9 @@ object TaskManager { val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds + /* Interval to send accumulators to the job manager */ + val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds --- End diff -- Removed. The update interval corresponds to the heartbeat interval. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120302065 Looks like this change breaks the YARN integration. The YARN WordCount no longer works. Should be working again now. It would be good if the accumulator update interval was configurable. Edit: Is that the same value as the heartbeats? Yes, that was a design rationale to keep the message count low. We could only send the accumulators in every Nth heartbeat and let it be configurable. The is a potential modification conflict: Drawing a snapshot for serialization and registering a new accumulator can lead to a ConcurrentModificationException in the drawing of the snapshot. I conducted tests with concurrent insertions and deletions and found that only concurrent removals cause ConcurrentModificationExceptions. Removals are not allowed for accumulators. Anyways, we could switch to a synchronized or copy on write hash map. If we do I would opt for the latter. The naming of the accumulators refers sometimes to flink vs. user-defined, and sometimes to internal vs. external. Can we make this consistent? I actually like the flink vs. user-defined naming better. Then let's stick to the flink vs. user-defined naming scheme. I think the code would be simpler is the registry simply always had a created map for internal and external accumulators. Also, a reporter object would help. I agree that would be a nicer way of dealing with the API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120305327 The reporter object has the advantage that it is more easily extensible. At some point we will want to differentiate between locally received bytes, and remotely received bytes, for example. Or include wait/stall times to detect whether a task is throttled by a slower predecessor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335325 --- Diff: flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- Thanks for spotting it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335345 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java --- @@ -57,6 +58,12 @@ private final BarrierBuffer barrierBuffer; + /** +* Counters for the number of bytes read and records processed. +*/ + private LongCounter numRecordsRead = null; --- End diff -- Thanks, I didn't know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335337 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -172,13 +173,20 @@ /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; - + /** The cache for user-defined files that the invokable requires */ private final FileCache fileCache; - + /** The gateway to the network stack, which handles inputs and produced results */ private final NetworkEnvironment network; + /** The registry of this task which enables live reporting of accumulators */ + private final AccumulatorRegistry accumulatorRegistry; + + public AccumulatorRegistry getAccumulatorRegistry() { --- End diff -- I agree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335380 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- This is a bug I discovered while reading through the code. It prevents processing of messages when the task manager is not connected to the job manager. If you look at line 307, it says it would skip the message but continues to process it. If you want I can open a separate pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335330 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java --- @@ -212,19 +218,19 @@ public InputGate getInputGate(int index) { return inputGates; } - @Override - public void reportAccumulators(MapString, Accumulator?, ? accumulators) { - AccumulatorEvent evt; - try { - evt = new AccumulatorEvent(getJobID(), accumulators); - } - catch (IOException e) { - throw new RuntimeException(Cannot serialize accumulators to send them to JobManager, e); - } - - ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt); - jobManagerActor.tell(accResult, ActorRef.noSender()); - } +// @Override --- End diff -- Yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34335342 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd, this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); - this.broadcastVariableManager =checkNotNull(bcVarManager); + this.broadcastVariableManager = checkNotNull(bcVarManager); + this.accumulatorRegistry = accumulatorRegistry; this.jobManager = checkNotNull(jobManagerActor); this.taskManager = checkNotNull(taskManagerActor); this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout)); - --- End diff -- No auto formats, just removes redundant tabs in empty lines and corrects a missing space. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120460741 I've addressed your comments in a new commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120029960 Looks like this change breaks the YARN integration. The YARN WordCount no longer works. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34268803 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java --- @@ -212,19 +218,19 @@ public InputGate getInputGate(int index) { return inputGates; } - @Override - public void reportAccumulators(MapString, Accumulator?, ? accumulators) { - AccumulatorEvent evt; - try { - evt = new AccumulatorEvent(getJobID(), accumulators); - } - catch (IOException e) { - throw new RuntimeException(Cannot serialize accumulators to send them to JobManager, e); - } - - ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt); - jobManagerActor.tell(accResult, ActorRef.noSender()); - } +// @Override --- End diff -- This can be properly removed, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34266800 --- Diff: flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- This must have been accidentally committed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34269026 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -237,12 +246,13 @@ public Task(TaskDeploymentDescriptor tdd, this.memoryManager = checkNotNull(memManager); this.ioManager = checkNotNull(ioManager); - this.broadcastVariableManager =checkNotNull(bcVarManager); + this.broadcastVariableManager = checkNotNull(bcVarManager); + this.accumulatorRegistry = accumulatorRegistry; this.jobManager = checkNotNull(jobManagerActor); this.taskManager = checkNotNull(taskManagerActor); this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout)); - --- End diff -- Lot's of auto-reformats... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120054333 The is a potential modification conflict: Drawing a snapshot for serialization and registering a new accumulator can lead to a ConcurrentModificationException in the drawing of the snapshot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273441 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -601,6 +636,11 @@ void markFinished() { } } + synchronized (accumulatorLock) { --- End diff -- Ignore this comment. The lock is good :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34274318 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -172,13 +173,20 @@ /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; - + /** The cache for user-defined files that the invokable requires */ private final FileCache fileCache; - + /** The gateway to the network stack, which handles inputs and produced results */ private final NetworkEnvironment network; + /** The registry of this task which enables live reporting of accumulators */ + private final AccumulatorRegistry accumulatorRegistry; --- End diff -- Since the AccumulatorRegistry is only used task-internally, and always retrieved form there, it should be initialized internally. Saves one more constructor parameter and helps with separation of concerns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34274983 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { --- End diff -- I do not understand the differentiation between the implementation of the `Internal` and `External` registry. From the usage pattern, both are accesses and initialized with a hash map. One time the hash map is created by the caller, once by the registry. I have not found a place where it would not work that the registry always creates the map immediately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34270017 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java --- @@ -57,6 +58,12 @@ private final BarrierBuffer barrierBuffer; + /** +* Counters for the number of bytes read and records processed. +*/ + private LongCounter numRecordsRead = null; --- End diff -- `null` initializations are actually redundant. They still get executed (for OpenJDK javac and Oracle javac), so it is actually overhead for no reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/896#issuecomment-120058469 The naming of the accumulators refers sometimes to flink vs. user-defined, and sometimes to internal vs. external. Can we make this consistent? I actually like the flink vs. user-defined naming better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34271710 --- Diff: flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java --- @@ -60,6 +60,7 @@ public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); --- End diff -- Probably the reason that the YARN tests are broken... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34272101 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(sDropping message $message because the TaskManager is currently + not connected to a JobManager.) -} +} else { -// we order the messages by frequency, to make sure the code paths for matching -// are as short as possible -message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + +// tell the task about the availability of a new input partition +case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + +// tell the task about the availability of some new input partitions +case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = + updateTaskInputPartitions(executionID, partitionInfos) + +// discards intermediate result partitions of a task execution on this TaskManager +case FailIntermediateResultPartitions(executionID) = + log.info(Discarding the results produced by task execution + executionID) + if (network.isAssociated) { +try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) +} catch { + case t: Throwable = killTaskManagerFatal( +Fatal leak: Unable to release intermediate result partition data, t) +} + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) = -updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) +// notifies the TaskManager that the state of a task has changed. +// the TaskManager informs the JobManager and cleans up in case the transition +// was into a terminal state, or in case the JobManager cannot be informed of the +// state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) = -updateTaskInputPartitions(executionID, partitionInfos) +case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) = - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) = -log.info(Discarding the results produced by task execution + executionID) -if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { -case t: Throwable = killTaskManagerFatal( -Fatal leak: Unable to release intermediate result partition data, t) - } -} + // we receive these from our tasks and forward them to the JobManager --- End diff -- Here is a lot of changed code that was seemingly edited without need (has nothing to do with the accumulators). Since that is pretty sensitive code, I feel very hesitant to commit these massive edits. What was the reason for these changes in the first place? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273000 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -131,6 +133,35 @@ private SerializedValueStateHandle? operatorState; + /* Lock for updating the accumulators atomically. */ --- End diff -- Why not follow the style of the remaining class, with respect to empty lines between fields? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/896 [FLINK-2292][FLINK-1573] add live per-task accumulators This refactors the accumulators to accumulate per task execution. The accumulators are reported from the task managers periodically to the job manager via the Heartbeat message. If the execution contains chained tasks, the accumulators are chained as well. The final accumulator results are reported via the UpdateTaskExecutionState message. The accumulators are now saved in the Execution within the ExecutionGraph. This makes the AccumulatorManager obsolete. It has been removed for now. In the future, we might introduce some caching for the web frontend visualization. Two types of accumulators are available: - external (user-defined via the RuntimeContext) - internal (flink metrics defined in the invocables) The internal (built-in) metrics are targeted at users who want to monitor their programs, e.g. through the job manager's web frontend. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink live-accumulators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/896.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #896 commit 7cec1236f087e72b40022bf02a6dbb12d74acbac Author: Maximilian Michels m...@apache.org Date: 2015-07-08T07:23:42Z [FLINK-2292][FLINK-1573] add live per-task accumulators This refactors the accumulators to accumulate per task execution. The accumulators are reported from the task managers periodically to the job manager via the Heartbeat message. If the execution contains chained tasks, the accumulators are chained as well. The final accumulator results are reported via the UpdateTaskExecutionState message. The accumulators are now saved in the Execution within the ExecutionGraph. This makes the AccumulatorManager obsolete. It has been removed for now. In the future, we might introduce some caching for the web frontend visualization. Two types of accumulators are available: - external (user-defined via the RuntimeContext) - internal (flink metrics defined in the invocables) The internal (built-in) metrics are targeted at users who want to monitor their programs, e.g. through the job manager's web frontend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34272308 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1017,6 +1040,9 @@ object TaskManager { val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds + /* Interval to send accumulators to the job manager */ + val ACCUMULATOR_REPORT_INTERVAL: FiniteDuration = 10 seconds --- End diff -- This variable is nowhere ever used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2292][FLINK-1573] add live per-task acc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34273136 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -601,6 +636,11 @@ void markFinished() { } } + synchronized (accumulatorLock) { --- End diff -- This lock here seems redundant. No place is looking for those two to be in sync. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---