[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397794870 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@Internal +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + } + + public void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception { + CloseableIterable keyedStateInputs = context.rawKeyedStateInputs(); + CloseableIterable operatorStateInputs = context.rawOperatorStateInputs(); + + try { + StateInitializationContext initializationContext = new StateInitializationContextImpl( +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397793382 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@Internal +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + } + + public void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception { + CloseableIterable keyedStateInputs = context.rawKeyedStateInputs(); + CloseableIterable operatorStateInputs = context.rawOperatorStateInputs(); + + try { + StateInitializationContext initializationContext = new StateInitializationContextImpl( +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397793382 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@Internal +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + } + + public void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception { + CloseableIterable keyedStateInputs = context.rawKeyedStateInputs(); + CloseableIterable operatorStateInputs = context.rawOperatorStateInputs(); + + try { + StateInitializationContext initializationContext = new StateInitializationContextImpl( +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397788433 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397138744 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@Internal +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + } + + public void initializeOperatorState(ThrowingConsumer initializeOperatorAction) throws Exception { + CloseableIterable keyedStateInputs = context.rawKeyedStateInputs(); + CloseableIterable operatorStateInputs = context.rawOperatorStateInputs(); + + try { + StateInitializationContext initializationContext = new StateInitializationContextImpl( +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397089080 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397081550 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@Internal +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + } + + public void initializeOperatorState(ThrowingConsumer initializeOperatorAction) throws Exception { + CloseableIterable keyedStateInputs = context.rawKeyedStateInputs(); + CloseableIterable operatorStateInputs = context.rawOperatorStateInputs(); + + try { + StateInitializationContext initializationContext = new StateInitializationContextImpl( +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397059586 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java ## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, intended to eventually replace {@link AbstractStreamOperator}. + * Currently intended to work smoothly just with {@link MultipleInputStreamOperator}. + * + * One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class AbstractStreamOperatorV2 implements StreamOperator { + /** The logger used by the operator class and its subclasses. */ + protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class); + + protected final StreamConfig config; + protected final Output> output; + private final StreamingRuntimeContext runtimeContext; + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; + private final CloseableRegistry cancelables; + private final long[] inputWatermarks; + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + protected final LatencyStats latencyStats; + protected final ProcessingTimeService processingTimeService; + + private StreamOperatorStateHandler stateHandler; + + // We keep track of watermarks from both inputs, the combined input is the minimum +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397053567 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396562862 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396521004 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java ## @@ -32,14 +30,13 @@ * * @param The output type of the operator */ -@Internal +@PublicEvolving public interface StreamOperatorFactory extends Serializable { Review comment: `@Experimental` looks good to me 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396394005 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.apache.flink.shaded.guava18.com.google.common.io.Closer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@Internal +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceM
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396389898 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java ## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, intended to eventually replace {@link AbstractStreamOperator}. + * Currently intended to work smoothly just with {@link MultipleInputStreamOperator}. + * + * One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class AbstractStreamOperatorV2 implements StreamOperator { + /** The logger used by the operator class and its subclasses. */ + protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class); + + protected final StreamConfig config; + protected final Output> output; + private final StreamingRuntimeContext runtimeContext; + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; + private final CloseableRegistry cancelables; + private final long[] inputWatermarks; + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + protected final LatencyStats latencyStats; + protected final ProcessingTimeService processingTimeService; + + private StreamOperatorStateHandler stateHandler; + + // We keep track of watermarks from both inputs, the combined input is the minimum +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396365770 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r396349962 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java ## @@ -32,14 +30,13 @@ * * @param The output type of the operator */ -@Internal +@PublicEvolving public interface StreamOperatorFactory extends Serializable { Review comment: I don't see how this is related to the scope of the PR. Can you please explain the motivation? Also, I don't think this interface is ready to become a part of public API (because of the type parameter issue discussed below; and probably other issues). I see fixing this interface and making it public should be separate PR or two. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r394916216 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java ## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, replacing previous {@link AbstractStreamOperator}. Review comment: This would confuse me if I'd implement a new operator. Should I extend this class or `AbstractStreamOperator`? (annotations don't tell much: this one is "Experimental" but the other one - "PublicEvolving"). I think we should direct users to v1 until we remove `@Experimental` (and deprecate v1). Maybe something like this: `New base class for all stream operators, intended to eventually replace {@link AbstractStreamOperator}.` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r394881417 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r394877955 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r394869466 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java ## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, replacing previous {@link AbstractStreamOperator}. + * Currently intended to work with {@link MultipleInputStreamOperator}. + * + * One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class StreamOperatorBase implements StreamOperator { + /** The logger used by the operator class and its subclasses. */ + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorBase.class); + + protected final StreamConfig config; + protected final Output> output; + private final StreamingRuntimeContext runtimeContext; + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; + private final CloseableRegistry cancelables; + private final long[] inputWatermarks; + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + protected final LatencyStats latencyStats; + protected final ProcessingTimeService processingTimeService; + + private StreamOperatorStateHandler stateHandler; + + // We keep track of watermarks from both inputs, the combined input is the minimum + // Once the minimum advances we emit a new wa
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393241368 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393258181 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java ## @@ -32,14 +30,13 @@ * * @param The output type of the operator */ -@Internal +@PublicEvolving public interface StreamOperatorFactory extends Serializable { /** * Create the operator. Sets access to the context and the output. */ - > T createStreamOperator( - StreamTask containingTask, StreamConfig config, Output> output); + > T createStreamOperator(StreamOperatorInitializer initializer); Review comment: Why do we allow client to specify return type (`T`)? IMO, it's a factory who knows what it creates, except for strange cases of `SimpleOperatorFactory` and `CodeGenOperatorFactory` :) I see two options: 1. parameterize `StreamOperatorFactory` with the return type (I tried - too many changes) 1. return `StreamOperator` and move cast to the client; this is less casts and IMO confusion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393239023 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java ## @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, replacing previous {@link AbstractStreamOperator}. + * Currently intended to work with {@link MultipleInputStreamOperator}. + * + * One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class StreamOperatorBase implements StreamOperator { Review comment: Or `AbstractStreamOperatorNg` :) I'd like to make it clear for somebody implementing new operator what's the difference and purpose of each - straight from names, without looking at annotations or javadocs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393245072 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393259043 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { Review comment: 👍 for extracting and grouping state-related operations This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393242720 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393252763 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorFactory.java ## @@ -58,11 +56,11 @@ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { } @Override - public StreamOperator createStreamOperator(StreamTask containingTask, StreamConfig config, Output output) { + public > T createStreamOperator(StreamOperatorInitializer initializer) { Review comment: +1, other candidates: `StreamOperatorCreationContext`, `StreamOperatorFactoryContext` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393239873 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java ## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, replacing previous {@link AbstractStreamOperator}. + * Currently intended to work with {@link MultipleInputStreamOperator}. + * + * One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class StreamOperatorBase implements StreamOperator { + /** The logger used by the operator class and its subclasses. */ + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorBase.class); + + protected final StreamConfig config; + protected final Output> output; + private final StreamingRuntimeContext runtimeContext; + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; + private final CloseableRegistry cancelables; + private final long[] inputWatermarks; + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + protected final LatencyStats latencyStats; + protected final ProcessingTimeService processingTimeService; + + private StreamOperatorStateHandler stateHandler; + + // We keep track of watermarks from both inputs, the combined input is the minimum + // Once the minimum advances we emit a new wa
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393246019 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +
[GitHub] [flink] rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r393246780 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java ## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.CloseableIterable; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +/** + * Class encapsulating various state backend handling logic for {@link StreamOperator} implementations. + */ +@PublicEvolving +public class StreamOperatorStateHandler { + + protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class); + + /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ + @Nullable + private final AbstractKeyedStateBackend keyedStateBackend; + private final CloseableRegistry closeableRegistry; + @Nullable + private final DefaultKeyedStateStore keyedStateStore; + private final OperatorStateBackend operatorStateBackend; + private final InternalTimeServiceManager timeServiceManager; + private final StreamOperatorStateContext context; + + public StreamOperatorStateHandler( + StreamOperatorStateContext context, + ExecutionConfig executionConfig, + CloseableRegistry closeableRegistry) { + this.context = context; + operatorStateBackend = context.operatorStateBackend(); + keyedStateBackend = context.keyedStateBackend(); + this.closeableRegistry = closeableRegistry; + + if (keyedStateBackend != null) { + keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig); + } + else { + keyedStateStore = null; + } + + timeServiceManager = context.internalTimerServiceManager(); +