[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397116721
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
 ##
 @@ -57,7 +57,13 @@
((ProcessingTimeServiceAware) 
operatorFactory).setProcessingTimeService(processingTimeService);
}
 
-   OP op = operatorFactory.createStreamOperator(containingTask, 
configuration, output);
+   // TODO: what to do with ProcessingTimeServiceAware?
+   OP op = operatorFactory.createStreamOperator(
+   new StreamOperatorInitializer<>(
+   containingTask,
+   configuration,
+   output,
+   processingTimeService));
 
 Review comment:
   @Deprecate and mark it for removal


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397117123
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   RunnableFuture> 
keyedStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
keyedStateRawFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateRawFuture = new CancelableFuture<>();
+
+   OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
+   keyedStateManagedFuture,
+   keyedStateRawFuture,
+   operatorStateManagedFuture,
+   operatorStateRawFuture);
+
+   StateSnapshotContextSynchronousImpl context = new 
TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, 
closeableRegistry);
+   context.getRawKeyedOperatorStateOutput();
+   context.getRawOperatorStateOutput();
+
+   StreamTaskStateInitializerImpl stateInitializer =
+   new StreamTaskStateInitializerImpl(new 
MockEnvironmentBuilder().build(), new MemoryStateBackend());
+   StreamOperatorStateContext stateContext = 
stateInitializer.streamOperatorStateContext(
+   new OperatorID(),
+  

[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396956395
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+   }

[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396961853
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -17,10 +17,13 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 /**
  * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
  * through a factory implementing this interface.
  */
+@PublicEvolving
 public interface YieldingOperatorFactory extends 
StreamOperatorFactory {
 
 Review comment:
   Haven't thought about this case . Since it's `Experimental`, I'm also fine 
with keeping it for now.
   In general, I don't think we should mix patterns though. 
   Here is some solution:  `StreamOperatorFactory` could have a `default 
boolean needsMailboxExecutor() { return false; }`, which triggers a nullable 
`mailboxExecutor` to be set in `Parameters`. The same method can be used to 
determine chainability.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-17 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393564140
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+   }

[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392906080
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   RunnableFuture> 
keyedStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
keyedStateRawFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateRawFuture = new CancelableFuture<>();
+
+   OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
+   keyedStateManagedFuture,
+   keyedStateRawFuture,
+   operatorStateManagedFuture,
+   operatorStateRawFuture);
+
+   StateSnapshotContextSynchronousImpl context = new 
TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, 
closeableRegistry);
+   context.getRawKeyedOperatorStateOutput();
+   context.getRawOperatorStateOutput();
+
+   StreamTaskStateInitializerImpl stateInitializer =
+   new StreamTaskStateInitializerImpl(new 
MockEnvironmentBuilder().build(), new MemoryStateBackend());
+   StreamOperatorStateContext stateContext = 
stateInitializer.streamOperatorStateContext(
+   new OperatorID(),
+  

[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392923505
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
 ##
 @@ -57,7 +57,13 @@
((ProcessingTimeServiceAware) 
operatorFactory).setProcessingTimeService(processingTimeService);
}
 
-   OP op = operatorFactory.createStreamOperator(containingTask, 
configuration, output);
+   // TODO: what to do with ProcessingTimeServiceAware?
+   OP op = operatorFactory.createStreamOperator(
+   new StreamOperatorInitializer<>(
+   containingTask,
+   configuration,
+   output,
+   processingTimeService));
 
 Review comment:
   In general, as written above  .
   
   Always passing `timeService` comes closer to my understanding of a factory 
(factory being stateless except for fundamental configurations that would 
change the type of the returned operator for all invocations of 
`createStreamOperator`). The factory then decides if it wants to use the 
service or not.
   
   If the service `processingTimeService` would only be (costly) created for a 
specific operator factory (e.g. MailboxExecutor being used only in 
AsyncWaitOperatorFactory), then I'd wrap the creation in a supplier.
   
   Ultimately, we would get rid of all the different OperatorFactory interfaces 
except for the main one. Then I'd be perfectly fine to keep factories and not 
convert them into builders.
   Note for that goal, we would need to get rid of SimpleOperatorFactory: Once 
an operator has been created, it cannot go back into factory. If we need to 
functionality, then I only see builder pattern as a clean solution, where going 
back and forth between operator and operator builder is doable.
   
   Last remark, if `StreamOperatorInitializer` ends up with 10+ fields that are 
all passed on construction, I'd probably switch to a builder style, but that 
can also be done later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392925814
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorInitializer.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Helper  class to construct {@link StreamOperatorBase}. Wraps couple of 
internal parameters
+ * to simplify for users construction of classes extending {@link 
StreamOperatorBase} and to
+ * allow for backward compatible changes in the {@link StreamOperatorBase}'s 
constructor.
+ */
+@Experimental
+public class StreamOperatorInitializer {
 
 Review comment:
   `OUT` needs a java tag: I initially thought OUT is the type of the operator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392914268
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorFactory.java
 ##
 @@ -58,11 +56,11 @@ public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
}
 
@Override
-   public StreamOperator createStreamOperator(StreamTask containingTask, 
StreamConfig config, Output output) {
+   public > T 
createStreamOperator(StreamOperatorInitializer initializer) {
 
 Review comment:
    to the idea. Long overdue. However, `StreamOperatorInitializer` sounds 
like something active, while it's just a parameter object. How about 
`StreamOperatorSettings` or `StreamOperatorParameters`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392930813
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   // Once the minimum advances we emit a new 

[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392927611
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 ##
 @@ -61,16 +59,16 @@ public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
}
 
@Override
-   public StreamOperator createStreamOperator(StreamTask containingTask, 
StreamConfig config, Output output) {
+   public > T 
createStreamOperator(StreamOperatorInitializer initializer) {
 
 Review comment:
   @rkhachatryan wanted to get rid of this method's generic afaik. It's really 
a bit anti pattern. So I'm not sure going into it makes any sense. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392925090
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorInitializer.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Helper  class to construct {@link StreamOperatorBase}. Wraps couple of 
internal parameters
+ * to simplify for users construction of classes extending {@link 
StreamOperatorBase} and to
+ * allow for backward compatible changes in the {@link StreamOperatorBase}'s 
constructor.
+ */
+@Experimental
+public class StreamOperatorInitializer {
 
 Review comment:
   Candidate for builder pattern as described above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392897158
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java
 ##
 @@ -133,4 +136,26 @@ static void execute(
}
}
 
+   private static void checkpointStreamOperator(
+   StreamOperator op,
 
 Review comment:
   nit: indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392932095
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
 
 Review comment:
   I'm fine with both. V2 conveys to me that this is the only Flink left in 
Flink 2.0. If that roughly corresponds with your deprecation plan, then I like 
it more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392898810
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
 ##
 @@ -26,7 +26,6 @@
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 
 Review comment:
    for commit. message could reflect that it's actually moved into 
`SetupableOperator`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392903610
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
 Review comment:
   Please check if that needs to be closed at the end of the test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392907594
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   RunnableFuture> 
keyedStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
keyedStateRawFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateRawFuture = new CancelableFuture<>();
+
+   OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
+   keyedStateManagedFuture,
+   keyedStateRawFuture,
+   operatorStateManagedFuture,
+   operatorStateRawFuture);
+
+   StateSnapshotContextSynchronousImpl context = new 
TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, 
closeableRegistry);
+   context.getRawKeyedOperatorStateOutput();
+   context.getRawOperatorStateOutput();
+
+   StreamTaskStateInitializerImpl stateInitializer =
+   new StreamTaskStateInitializerImpl(new 
MockEnvironmentBuilder().build(), new MemoryStateBackend());
+   StreamOperatorStateContext stateContext = 
stateInitializer.streamOperatorStateContext(
+   new OperatorID(),
+  

[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392933478
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -17,10 +17,13 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 /**
  * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
  * through a factory implementing this interface.
  */
+@PublicEvolving
 public interface YieldingOperatorFactory extends 
StreamOperatorFactory {
 
 Review comment:
   This guy should be removed and merged into `StreamOperatorInitializer` by 
exposing a `Supplier mailboxExecutorFactory`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392897536
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointingOperation.java
 ##
 @@ -133,4 +136,26 @@ static void execute(
}
}
 
+   private static void checkpointStreamOperator(
+   StreamOperator op,
+   CheckpointMetaData checkpointMetaData,
+   CheckpointOptions checkpointOptions,
+   CheckpointStreamFactory storageLocation,
+   HashMap 
operatorSnapshotsInProgress,
 
 Review comment:
   `Map` or better return the `OperatorSnapshotFutures` and put it on caller 
side into map.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
AHeise commented on a change in pull request #11403: [FLINK-16316][operators] 
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392905579
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   RunnableFuture> 
keyedStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
keyedStateRawFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateRawFuture = new CancelableFuture<>();
+
+   OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
+   keyedStateManagedFuture,
+   keyedStateRawFuture,
+   operatorStateManagedFuture,
+   operatorStateRawFuture);
+
+   StateSnapshotContextSynchronousImpl context = new 
TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, 
closeableRegistry);
+   context.getRawKeyedOperatorStateOutput();
+   context.getRawOperatorStateOutput();
+
+   StreamTaskStateInitializerImpl stateInitializer =
+   new StreamTaskStateInitializerImpl(new 
MockEnvironmentBuilder().build(), new MemoryStateBackend());
+   StreamOperatorStateContext stateContext = 
stateInitializer.streamOperatorStateContext(
+   new OperatorID(),
+