[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397116721 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java ## @@ -57,7 +57,13 @@ ((ProcessingTimeServiceAware) operatorFactory).setProcessingTimeService(processingTimeService); } - OP op = operatorFactory.createStreamOperator(containingTask, configuration, output); + // TODO: what to do with ProcessingTimeServiceAware? + OP op = operatorFactory.createStreamOperator( + new StreamOperatorInitializer<>( + containingTask, + configuration, + output, + processingTimeService)); Review comment: @Deprecate and mark it for removal This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397117123 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.fail; + +/** + * Tests for {@link StreamOperatorStateHandlerTest}. + */ +public class StreamOperatorStateHandlerTest { + /** +* Tests that a failing snapshot method call to the keyed state backend will trigger the closing +* of the StateSnapshotContextSynchronousImpl and the cancellation of the +* OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures. +*/ + @Test + public void testFailingBackendSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + RunnableFuture> keyedStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> keyedStateRawFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateRawFuture = new CancelableFuture<>(); + + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( + keyedStateManagedFuture, + keyedStateRawFuture, + operatorStateManagedFuture, + operatorStateRawFuture); + + StateSnapshotContextSynchronousImpl context = new TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, closeableRegistry); + context.getRawKeyedOperatorStateOutput(); + context.getRawOperatorStateOutput(); + + StreamTaskStateInitializerImpl stateInitializer = + new StreamTaskStateInitializerImpl(new MockEnvironmentBuilder().build(), new MemoryStateBackend()); + StreamOperatorStateContext stateContext = stateInitializer.streamOperatorStateContext( + new OperatorID(), +
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396956395 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); + }
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396961853 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java ## @@ -17,10 +17,13 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.PublicEvolving; + /** * An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators needs to be created * through a factory implementing this interface. */ +@PublicEvolving public interface YieldingOperatorFactory extends StreamOperatorFactory { Review comment: Haven't thought about this case . Since it's `Experimental`, I'm also fine with keeping it for now. In general, I don't think we should mix patterns though. Here is some solution: `StreamOperatorFactory` could have a `default boolean needsMailboxExecutor() { return false; }`, which triggers a nullable `mailboxExecutor` to be set in `Parameters`. The same method can be used to determine chainability. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393564140 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); + }
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392906080 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.fail; + +/** + * Tests for {@link StreamOperatorStateHandlerTest}. + */ +public class StreamOperatorStateHandlerTest { + /** +* Tests that a failing snapshot method call to the keyed state backend will trigger the closing +* of the StateSnapshotContextSynchronousImpl and the cancellation of the +* OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures. +*/ + @Test + public void testFailingBackendSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + RunnableFuture> keyedStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> keyedStateRawFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateRawFuture = new CancelableFuture<>(); + + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( + keyedStateManagedFuture, + keyedStateRawFuture, + operatorStateManagedFuture, + operatorStateRawFuture); + + StateSnapshotContextSynchronousImpl context = new TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, closeableRegistry); + context.getRawKeyedOperatorStateOutput(); + context.getRawOperatorStateOutput(); + + StreamTaskStateInitializerImpl stateInitializer = + new StreamTaskStateInitializerImpl(new MockEnvironmentBuilder().build(), new MemoryStateBackend()); + StreamOperatorStateContext stateContext = stateInitializer.streamOperatorStateContext( + new OperatorID(), +
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392923505 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java ## @@ -57,7 +57,13 @@ ((ProcessingTimeServiceAware) operatorFactory).setProcessingTimeService(processingTimeService); } - OP op = operatorFactory.createStreamOperator(containingTask, configuration, output); + // TODO: what to do with ProcessingTimeServiceAware? + OP op = operatorFactory.createStreamOperator( + new StreamOperatorInitializer<>( + containingTask, + configuration, + output, + processingTimeService)); Review comment: In general, as written above . Always passing `timeService` comes closer to my understanding of a factory (factory being stateless except for fundamental configurations that would change the type of the returned operator for all invocations of `createStreamOperator`). The factory then decides if it wants to use the service or not. If the service `processingTimeService` would only be (costly) created for a specific operator factory (e.g. MailboxExecutor being used only in AsyncWaitOperatorFactory), then I'd wrap the creation in a supplier. Ultimately, we would get rid of all the different OperatorFactory interfaces except for the main one. Then I'd be perfectly fine to keep factories and not convert them into builders. Note for that goal, we would need to get rid of SimpleOperatorFactory: Once an operator has been created, it cannot go back into factory. If we need to functionality, then I only see builder pattern as a clean solution, where going back and forth between operator and operator builder is doable. Last remark, if `StreamOperatorInitializer` ends up with 10+ fields that are all passed on construction, I'd probably switch to a builder style, but that can also be done later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392925814 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorInitializer.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * Helper class to construct {@link StreamOperatorBase}. Wraps couple of internal parameters + * to simplify for users construction of classes extending {@link StreamOperatorBase} and to + * allow for backward compatible changes in the {@link StreamOperatorBase}'s constructor. + */ +@Experimental +public class StreamOperatorInitializer { Review comment: `OUT` needs a java tag: I initially thought OUT is the type of the operator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392914268 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorFactory.java ## @@ -58,11 +56,11 @@ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { } @Override - public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config, Output output) { + public > T createStreamOperator(StreamOperatorInitializer initializer) { Review comment: to the idea. Long overdue. However, `StreamOperatorInitializer` sounds like something active, while it's just a parameter object. How about `StreamOperatorSettings` or `StreamOperatorParameters`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392930813 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java ## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, replacing previous {@link AbstractStreamOperator}. + * Currently intended to work with {@link MultipleInputStreamOperator}. + * + * One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class StreamOperatorBase implements StreamOperator { + /** The logger used by the operator class and its subclasses. */ + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorBase.class); + + protected final StreamConfig config; + protected final Output> output; + private final StreamingRuntimeContext runtimeContext; + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; + private final CloseableRegistry cancelables; + private final long[] inputWatermarks; + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + protected final LatencyStats latencyStats; + protected final ProcessingTimeService processingTimeService; + + private StreamOperatorStateHandler stateHandler; + + // We keep track of watermarks from both inputs, the combined input is the minimum + // Once the minimum advances we emit a new
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392927611 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java ## @@ -61,16 +59,16 @@ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { } @Override - public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config, Output output) { + public > T createStreamOperator(StreamOperatorInitializer initializer) { Review comment: @rkhachatryan wanted to get rid of this method's generic afaik. It's really a bit anti pattern. So I'm not sure going into it makes any sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392925090 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorInitializer.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * Helper class to construct {@link StreamOperatorBase}. Wraps couple of internal parameters + * to simplify for users construction of classes extending {@link StreamOperatorBase} and to + * allow for backward compatible changes in the {@link StreamOperatorBase}'s constructor. + */ +@Experimental +public class StreamOperatorInitializer { Review comment: Candidate for builder pattern as described above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392897158 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java ## @@ -133,4 +136,26 @@ static void execute( } } + private static void checkpointStreamOperator( + StreamOperator op, Review comment: nit: indent This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392932095 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java ## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, replacing previous {@link AbstractStreamOperator}. + * Currently intended to work with {@link MultipleInputStreamOperator}. + * + * One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class StreamOperatorBase implements StreamOperator { Review comment: I'm fine with both. V2 conveys to me that this is the only Flink left in Flink 2.0. If that roughly corresponds with your deprecation plan, then I like it more. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392898810 ## File path: flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java ## @@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.CheckpointStorageWorkerView; Review comment: for commit. message could reflect that it's actually moved into `SetupableOperator`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392903610 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.fail; + +/** + * Tests for {@link StreamOperatorStateHandlerTest}. + */ +public class StreamOperatorStateHandlerTest { + /** +* Tests that a failing snapshot method call to the keyed state backend will trigger the closing +* of the StateSnapshotContextSynchronousImpl and the cancellation of the +* OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures. +*/ + @Test + public void testFailingBackendSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); Review comment: Please check if that needs to be closed at the end of the test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392907594 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.fail; + +/** + * Tests for {@link StreamOperatorStateHandlerTest}. + */ +public class StreamOperatorStateHandlerTest { + /** +* Tests that a failing snapshot method call to the keyed state backend will trigger the closing +* of the StateSnapshotContextSynchronousImpl and the cancellation of the +* OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures. +*/ + @Test + public void testFailingBackendSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + RunnableFuture> keyedStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> keyedStateRawFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateRawFuture = new CancelableFuture<>(); + + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( + keyedStateManagedFuture, + keyedStateRawFuture, + operatorStateManagedFuture, + operatorStateRawFuture); + + StateSnapshotContextSynchronousImpl context = new TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, closeableRegistry); + context.getRawKeyedOperatorStateOutput(); + context.getRawOperatorStateOutput(); + + StreamTaskStateInitializerImpl stateInitializer = + new StreamTaskStateInitializerImpl(new MockEnvironmentBuilder().build(), new MemoryStateBackend()); + StreamOperatorStateContext stateContext = stateInitializer.streamOperatorStateContext( + new OperatorID(), +
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392933478 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java ## @@ -17,10 +17,13 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.PublicEvolving; + /** * An operator that needs access to the {@link MailboxExecutor} to yield to downstream operators needs to be created * through a factory implementing this interface. */ +@PublicEvolving public interface YieldingOperatorFactory extends StreamOperatorFactory { Review comment: This guy should be removed and merged into `StreamOperatorInitializer` by exposing a `Supplier mailboxExecutorFactory`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392897536 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java ## @@ -133,4 +136,26 @@ static void execute( } } + private static void checkpointStreamOperator( + StreamOperator op, + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointStreamFactory storageLocation, + HashMap operatorSnapshotsInProgress, Review comment: `Map` or better return the `OperatorSnapshotFutures` and put it on caller side into map. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r392905579 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.fail; + +/** + * Tests for {@link StreamOperatorStateHandlerTest}. + */ +public class StreamOperatorStateHandlerTest { + /** +* Tests that a failing snapshot method call to the keyed state backend will trigger the closing +* of the StateSnapshotContextSynchronousImpl and the cancellation of the +* OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures. +*/ + @Test + public void testFailingBackendSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + RunnableFuture> keyedStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> keyedStateRawFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateManagedFuture = new CancelableFuture<>(); + RunnableFuture> operatorStateRawFuture = new CancelableFuture<>(); + + OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures( + keyedStateManagedFuture, + keyedStateRawFuture, + operatorStateManagedFuture, + operatorStateRawFuture); + + StateSnapshotContextSynchronousImpl context = new TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, closeableRegistry); + context.getRawKeyedOperatorStateOutput(); + context.getRawOperatorStateOutput(); + + StreamTaskStateInitializerImpl stateInitializer = + new StreamTaskStateInitializerImpl(new MockEnvironmentBuilder().build(), new MemoryStateBackend()); + StreamOperatorStateContext stateContext = stateInitializer.streamOperatorStateContext( + new OperatorID(), +