[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-25 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397794870
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void initializeOperatorState(CheckpointedStreamOperator 
streamOperator) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-25 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397793382
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void initializeOperatorState(CheckpointedStreamOperator 
streamOperator) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-25 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397793382
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void initializeOperatorState(CheckpointedStreamOperator 
streamOperator) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-25 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397788433
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397138744
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void 
initializeOperatorState(ThrowingConsumer 
initializeOperatorAction) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+ 

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397089080
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397081550
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void 
initializeOperatorState(ThrowingConsumer 
initializeOperatorAction) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+ 

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397059586
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, intended to eventually replace 
{@link AbstractStreamOperator}.
+ * Currently intended to work smoothly just with {@link 
MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class AbstractStreamOperatorV2 implements 
StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397053567
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396562862
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396521004
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -32,14 +30,13 @@
  *
  * @param  The output type of the operator
  */
-@Internal
+@PublicEvolving
 public interface StreamOperatorFactory extends Serializable {
 
 Review comment:
   `@Experimental` looks good to me 👍


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396394005
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceM

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396389898
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, intended to eventually replace 
{@link AbstractStreamOperator}.
+ * Currently intended to work smoothly just with {@link 
MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class AbstractStreamOperatorV2 implements 
StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396365770
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396349962
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -32,14 +30,13 @@
  *
  * @param  The output type of the operator
  */
-@Internal
+@PublicEvolving
 public interface StreamOperatorFactory extends Serializable {
 
 Review comment:
   I don't see how this is related to the scope of the PR. Can you please 
explain the motivation?
   
   Also, I don't think this interface is ready to become a part of public API 
(because of the type parameter issue discussed below; and probably other 
issues).
   
   I see fixing this interface and making it public should be separate PR or 
two.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394916216
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
 
 Review comment:
   This would confuse me if I'd implement a new operator. Should I extend this 
class or `AbstractStreamOperator`?
   (annotations don't tell much: this one is "Experimental" but the other one - 
"PublicEvolving").
   
   I think we should direct users to v1 until we remove `@Experimental` (and 
deprecate v1).
   
   Maybe something like this:
   `New base class for all stream operators, intended to eventually replace 
{@link AbstractStreamOperator}.`
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394881417
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394877955
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394869466
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   // Once the minimum advances we emit a new wa

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393241368
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393258181
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -32,14 +30,13 @@
  *
  * @param  The output type of the operator
  */
-@Internal
+@PublicEvolving
 public interface StreamOperatorFactory extends Serializable {
 
/**
 * Create the operator. Sets access to the context and the output.
 */
-   > T createStreamOperator(
-   StreamTask containingTask, StreamConfig config, 
Output> output);
+   > T 
createStreamOperator(StreamOperatorInitializer initializer);
 
 Review comment:
   Why do we allow client to specify return type (`T`)? 
   IMO, it's a factory who knows what it creates, except for strange cases of 
`SimpleOperatorFactory` and `CodeGenOperatorFactory` :)
   
   I see two options:
   1. parameterize `StreamOperatorFactory` with the return type (I tried - too 
many changes)
   1. return `StreamOperator` and move cast to the client; this is less 
casts and IMO confusion


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393239023
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
 
 Review comment:
   Or `AbstractStreamOperatorNg` :)
   I'd like to make it clear for somebody implementing new operator what's the 
difference and purpose of each - straight from names, without looking at 
annotations or javadocs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393245072
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393259043
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
 
 Review comment:
   👍 for extracting and grouping state-related operations


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393242720
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393252763
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorFactory.java
 ##
 @@ -58,11 +56,11 @@ public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
}
 
@Override
-   public StreamOperator createStreamOperator(StreamTask containingTask, 
StreamConfig config, Output output) {
+   public > T 
createStreamOperator(StreamOperatorInitializer initializer) {
 
 Review comment:
   +1, other candidates: `StreamOperatorCreationContext`, 
`StreamOperatorFactoryContext`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393239873
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   // Once the minimum advances we emit a new wa

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393246019
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+  

[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-16 Thread GitBox
rkhachatryan commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r393246780
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+