[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/4353 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133027325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { --- End diff -- This class is totally intended to be immutable. So beyond what it is currently enforcing, do you suggest using immutable collections inside? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133026125 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + + /** Mapping from an operator id to the state of one subtask of this operator */ + private final MapsubtaskStatesByOperatorID; --- End diff -- Hmm, I think if we consider default load factors and for large sizes, I would pick a min >30% hit rate linear array scan over 100% hit rate random access iteration. For all expected sizes (in cache) in this class, it should not matter. LHM also consumes a bit more memory. I would tend to keep it this way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133010887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { --- End diff -- Would it make sense to make this immutable? It looks like this should not be modified any more after fully constructing it. This would also make it clear that methods iterating over the state, or returning sets / iterables can never fail with concurrent modifications. For example the `size` method is considered a "best effort" method for info purposes only, and should not fail with an exception (it currently could fail with a `ConcurrentModificationException`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133021771 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -878,14 +873,17 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { } long checkpointId2 = pending2.getCheckpointId(); - MapoperatorStates2 = pending2.getOperatorStates(); + TaskStateSnapshot taskOperatorSubtaskStates2_1 = spy(new TaskStateSnapshot()); --- End diff -- Same as above, spying necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133013315 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java --- @@ -164,8 +168,16 @@ public void acknowledgeCheckpoint( throw new RuntimeException(e); } + boolean hasManagedKeyedState = false; + for (Map.Entryentry : checkpointStateHandles.getSubtaskStateMappings()) { + OperatorSubtaskState state = entry.getValue(); + if (state != null) { + hasManagedKeyedState |= state.getManagedKeyedState() != null; + } + } + // should be one k/v state --- End diff -- "should be **at least** one k/v state"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133021720 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -850,18 +843,20 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { OperatorID opID2 = OperatorID.fromJobVertexID(ackVertex2.getJobvertexId()); OperatorID opID3 = OperatorID.fromJobVertexID(ackVertex3.getJobvertexId()); - MapoperatorStates1 = pending1.getOperatorStates(); + TaskStateSnapshot taskOperatorSubtaskStates1_1 = spy(new TaskStateSnapshot()); --- End diff -- Is spying necessary here? There seem to be no `verify()` calls on this type... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133009796 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by + * the checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * + * This class should be called TaskState once the old class with this name that we keep for backwards + * compatibility goes away. + */ +public class TaskStateSnapshot implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + + /** Mapping from an operator id to the state of one subtask of this operator */ + private final MapsubtaskStatesByOperatorID; --- End diff -- A `LinkedHashMap` has a slightly more predictable iteration performance (list traversal) compared to a `HashMap` (search through sparse table array). There are a lot of value iterations done in this class, but we also should have pretty full hash tables (since we never delete), so not sure how much difference it makes... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133018189 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -185,44 +184,66 @@ private void assignAttemptState(ExecutionJobVertex executionJobVertex, List
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133016663 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -75,31 +103,79 @@ */ private final long stateSize; + @VisibleForTesting + public OperatorSubtaskState(StreamStateHandle legacyOperatorState) { + + this(legacyOperatorState, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); + } + + /** +* Empty state. +*/ + public OperatorSubtaskState() { --- End diff -- Minor optimization: One could make this constructor `private` and have a field `OperatorSubtaskState.EMPTY` as a placeholder for the empty states. I'd leave this to you whether you think it worth doing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133022095 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() { assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); - OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); - OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); - - MapoperatorStates = checkpoint.getOperatorStates(); - - operatorStates.put(opID1, new SpyInjectingOperatorState( - opID1, vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism())); - operatorStates.put(opID2, new SpyInjectingOperatorState( - opID2, vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism())); - // check that the vertices received the trigger checkpoint message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); } + OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); + OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); + TaskStateSnapshot taskOperatorSubtaskStates1 = mock(TaskStateSnapshot.class); --- End diff -- Why not create a proper `TaskStateSnapshot` with one entry, rather than mocking? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r130295736 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() { } @Override - public final void initializeState(OperatorStateHandles stateHandles) throws Exception { + public final void initializeState(OperatorSubtaskState stateHandles) throws Exception { Collection keyedStateHandlesRaw = null; Collection operatorStateHandlesRaw = null; Collection operatorStateHandlesBackend = null; - boolean restoring = null != stateHandles; + boolean restoring = (null != stateHandles); --- End diff -- +1 to keep the parenthesis I think we should let contributors use such styles at their discretion --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129532719 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() { } @Override - public final void initializeState(OperatorStateHandles stateHandles) throws Exception { + public final void initializeState(OperatorSubtaskState stateHandles) throws Exception { Collection keyedStateHandlesRaw = null; Collection operatorStateHandlesRaw = null; Collection operatorStateHandlesBackend = null; - boolean restoring = null != stateHandles; + boolean restoring = (null != stateHandles); --- End diff -- I like to do this when generating a boolean out of a `!=` or `==` comparison because I find this easier to read in the presence of more than one `=` character. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129272281 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by t --- End diff -- add empty line before paragraph --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129277176 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -867,81 +845,60 @@ public String toString() { AsyncCheckpointRunnable( StreamTask owner, - List nonPartitionedStateHandles, - List snapshotInProgressList, + MapnonPartitionedStateHandles, + Map operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long asyncStartNanos) { this.owner = Preconditions.checkNotNull(owner); - this.snapshotInProgressList = Preconditions.checkNotNull(snapshotInProgressList); + this.operatorSnapshotsInProgress = Preconditions.checkNotNull(operatorSnapshotsInProgress); this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData); this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics); this.nonPartitionedStateHandles = nonPartitionedStateHandles; this.asyncStartNanos = asyncStartNanos; - - if (!snapshotInProgressList.isEmpty()) { - // TODO Currently only the head operator of a chain can have keyed state, so simply access it directly. - int headIndex = snapshotInProgressList.size() - 1; - OperatorSnapshotResult snapshotInProgress = snapshotInProgressList.get(headIndex); - if (null != snapshotInProgress) { - this.futureKeyedBackendStateHandles = snapshotInProgress.getKeyedStateManagedFuture(); - this.futureKeyedStreamStateHandles = snapshotInProgress.getKeyedStateRawFuture(); - } - } } @Override public void run() { FileSystemSafetyNet.initializeSafetyNetForThread(); try { - // Keyed state handle future, currently only one (the head) operator can have this - KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); - KeyedStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); - - List operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size()); - List operatorStatesStream = new ArrayList<>(snapshotInProgressList.size()); - - for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) { - if (null != snapshotInProgress) { - operatorStatesBackend.add( - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture())); - operatorStatesStream.add( - FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture())); - } else { - operatorStatesBackend.add(null); - operatorStatesStream.add(null); - } - } + boolean hasState = false; + final TaskStateSnapshot taskOperatorSubtaskStates = + new TaskStateSnapshot(operatorSnapshotsInProgress.size()); - final long asyncEndNanos = System.nanoTime(); - final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000; + for (Map.Entry entry : operatorSnapshotsInProgress.entrySet()) { - checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); + OperatorID operatorID = entry.getKey(); + OperatorSnapshotResult snapshotInProgress = entry.getValue(); - ChainedStateHandle
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129272254 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class encapsulates state handles to the snapshots of all operator instances executed within one task. A task + * can run multiple operator instances as a result of operator chaining, and all operator instances from the chain can + * register their state under their operator id. Each operator instance is a physical execution responsible for + * processing a partition of the data that goes through a logical operator. This partitioning happens to parallelize + * execution of logical operators, e.g. distributing a map function. + * One instance of this class contains the information that one task will send to acknowledge a checkpoint request by t + * he checkpoint coordinator. Tasks run operator instances in parallel, so the union of all + * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator from all tasks represent the whole + * state of a job at the time of the checkpoint. + * This class should be called TaskState once the old class with this name that we keep for backwards --- End diff -- add empty line before paragraph --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129275823 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -208,13 +208,13 @@ public MetricGroup getMetricGroup() { } @Override - public final void initializeState(OperatorStateHandles stateHandles) throws Exception { + public final void initializeState(OperatorSubtaskState stateHandles) throws Exception { Collection keyedStateHandlesRaw = null; Collection operatorStateHandlesRaw = null; Collection operatorStateHandlesBackend = null; - boolean restoring = null != stateHandles; + boolean restoring = (null != stateHandles); --- End diff -- why did you add the braces? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129278519 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -18,20 +18,43 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}. + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical) + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all + * parallel tasks that physically execute parallelized, physical instances of the operator. + * + * The full state of the logical operator is represented by {@link OperatorState} which consists of + * {@link OperatorSubtaskState}s. + * + * Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle --- End diff -- because there **is** --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129274874 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java --- @@ -118,10 +118,22 @@ public void testSetState() { PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next(); final long checkpointId = pending.getCheckpointId(); - SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles)); - coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, new CheckpointMetrics(), checkpointStateHandles)); + final TaskStateSnapshot subtaskStates = new TaskStateSnapshot(); + + subtaskStates.putSubtaskStateByOperatorID( + OperatorID.fromJobVertexID(statefulId), + new OperatorSubtaskState( + serializedState.get(0), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(serializedKeyGroupStates), + Collections.emptyList())); + + //SubtaskState checkpointStateHandles = new SubtaskState(serializedState, null, null, serializedKeyGroupStates, null); --- End diff -- whats up with this line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129269706 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java --- @@ -89,6 +89,10 @@ private GroupByStateNameResults groupByStateName( for (OperatorStateHandle psh : previousParallelSubtaskStates) { + if(psh == null) { --- End diff -- missing spacer after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129274620 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -3631,16 +3673,16 @@ public void testSavepointsAreNotAddedToCompletedCheckpointStore() throws Excepti completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == checkpointIDCounter.getLast()); } - private static final class SpyInjectingOperatorState extends OperatorState { - - private static final long serialVersionUID = -4004437428483663815L; - - public SpyInjectingOperatorState(OperatorID taskID, int parallelism, int maxParallelism) { - super(taskID, parallelism, maxParallelism); - } - - public void putState(int subtaskIndex, OperatorSubtaskState subtaskState) { - super.putState(subtaskIndex, spy(subtaskState)); - } - } +// private static final class SpyInjectingOperatorState extends OperatorState { --- End diff -- you can remove this (which is _really_ great...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129278337 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -75,31 +103,84 @@ */ private final long stateSize; + @VisibleForTesting + public OperatorSubtaskState(StreamStateHandle legacyOperatorState) { --- End diff -- should this constructor call the other one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129266693 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -18,20 +18,40 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}. + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical) + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all + * parallel tasks that physically execute parallelized, physical instances of the operator. + * The full state of the logical operator is represented by {@link OperatorState} which consists of + * {@link OperatorSubtaskState}s. + * Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle + * produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot. + * The purpose of having the state handles in collections is that this class is also reused in restoring state. + * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In --- End diff -- In the master, we used two different classes for this purpose: `OperatorSubtaskState` to report from task to master, and `TaskStateHandles` to restore from master to task. Their difference is that in the first all fields are singletons, and the second all are collections. Otherwise, their purpose is identical, so I collapsed them into one class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129019373 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java --- @@ -164,8 +168,16 @@ public void acknowledgeCheckpoint( throw new RuntimeException(e); } + boolean hasKeyedManagedKeyedState = false; --- End diff -- -> `hasManagedKeyedState`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129020085 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -164,6 +269,7 @@ public long getStateSize() { // + --- End diff -- remove this empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r129020863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -18,20 +18,40 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}. + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical) + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all + * parallel tasks that physically execute parallelized, physical instances of the operator. + * The full state of the logical operator is represented by {@link OperatorState} which consists of + * {@link OperatorSubtaskState}s. + * Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle + * produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot. + * The purpose of having the state handles in collections is that this class is also reused in restoring state. + * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In --- End diff -- How come we don't need this in the current master, where this class is also used for restoring state? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r127709120 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -18,20 +18,40 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}. + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical) + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all + * parallel tasks that physically execute parallelized, physical instances of the operator. + * The full state of the logical operator is represented by {@link OperatorState} which consists of --- End diff -- please add en empty line before the `` tag so we have to make less changes when activating checkstyle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r127708906 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java --- @@ -18,20 +18,40 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; /** - * Container for the state of one parallel subtask of an operator. This is part of the {@link OperatorState}. + * This class encapsulates the state for one parallel instance of an operator. The complete state of a (logical) + * operator (e.g. a flatmap operator) consists of the union of all {@link OperatorSubtaskState}s from all + * parallel tasks that physically execute parallelized, physical instances of the operator. + * The full state of the logical operator is represented by {@link OperatorState} which consists of + * {@link OperatorSubtaskState}s. + * Typically, we expect all collections in this class to be of size 0 or 1, because there up to one state handle + * produced per state type (e.g. managed-keyed, raw-operator, ...). In particular, this holds when taking a snapshot. + * The purpose of having the state handles in collections is that this class is also reused in restoring state. + * Under normal circumstances, the expected size of each collection is still 0 or 1, except for scale-down. In + * scale-down, one operator subtask can become responsible for the state of multiple previous subtasks. The collections + * can then store all the state handles that are relevant to build up the new subtask state. + * There is no collection for legacy state because it is nor rescalable. --- End diff -- typo: nor -> not --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4353: [FLINK-7213] Introduce state management by Operato...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/4353 [FLINK-7213] Introduce state management by OperatorID in TaskManager Flink-5892 introduced the job manager / checkpoint coordinator part of managing state on the operator level instead of the task level by introducing explicit operator_id -> state mappings. However, this explicit mapping was not introduced in the task manager side, so the explicit mapping is still converted into a mapping that suits the implicit operator chain order. This PR introduces this part and offers explicit state management by operator_id in the task manager. Furthermore, this PR also introduces `TaskStateSnapshot` as unify abstraction to replace `TaskStateHandles` and `SubtaskState`which were always very similar, except that one offered collections of state handles (to support scaling in on restore) while the other only contained single objects (because each state is snapshotted into one state handle). You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink tmpBU Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4353.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4353 commit d68dd39b343595120f62fc8280b2f3c5f0ee7503 Author: Stefan RichterDate: 2017-06-26T16:07:59Z [FLINK-7213] Introduce state management by OperatorID in TaskManager commit 11cdd85668aa18f8e5bab0e6cac9ba082bfea95c Author: Stefan Richter Date: 2017-07-11T15:10:03Z [FLINK-7213] Introduce TaskStateSnapshot to unify TaskStateHandles and SubtaskState --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---