[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202517181 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java --- @@ -85,10 +85,10 @@ public int compare(T o1, T o2) { return ((Comparable) o1).compareTo(o2); } - // we catch this case before moving to more expensive tie breaks. - if (o1.equals(o2)) { - return 0; - } +// // we catch this case before moving to more expensive tie breaks. --- End diff -- For what reason we need to comment this. ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202519256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -446,8 +485,10 @@ public String toString() { @Override public int numStateEntries() { int sum = 0; - for (StateTable stateTable : stateTables.values()) { - sum += stateTable.size(); + for (StateSnapshotRestore stateTable : registeredStates.values()) { --- End diff -- nit: the name `stateTable` is a bit confusion, since it is the `RegisteredState`(which might not be `StateTable`) now... ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202517247 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -305,6 +351,6 @@ private void checkRefillCacheFromStore() { * after usage. */ @Nonnull - CloseableIterator orderedIterator(); + CloseableIterator orderedIterator();; --- End diff -- a duplicated `;` ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202516863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java --- @@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) } } + public static StateSnapshotKeyGroupReader createKeyGroupPartitionReader( + @Nonnull ElementReaderFunction readerFunction, + @Nonnull KeyGroupElementsConsumer elementConsumer) { + return new PartitioningResultKeyGroupReader<>(readerFunction, elementConsumer); + } + + /** +* General algorithm to read key-grouped state that was written from a {@link PartitioningResult} +* @param --- End diff -- description for `T` is missing. ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202519024 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupPartitioner; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; +import org.apache.flink.runtime.state.StateSnapshot; +import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader; +import org.apache.flink.runtime.state.StateSnapshotRestore; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +/** + * This wrapper combines a HeapPriorityQueue with backend meta data. + * + * @param type of the queue elements. + */ +public class HeapPriorityQueueSnapshotRestoreWrapper + implements StateSnapshotRestore { + + @Nonnull + private final HeapPriorityQueueSet priorityQueue; + @Nonnull + private final KeyExtractorFunction keyExtractorFunction; + @Nonnull + private final RegisteredPriorityQueueStateBackendMetaInfo metaInfo; + @Nonnull + private final KeyGroupRange localKeyGroupRange; + @Nonnegative + private final int totalKeyGroups; + + public HeapPriorityQueueSnapshotRestoreWrapper( + @Nonnull HeapPriorityQueueSet priorityQueue, + @Nonnull RegisteredPriorityQueueStateBackendMetaInfo metaInfo, + @Nonnull KeyExtractorFunction keyExtractorFunction, + @Nonnull KeyGroupRange localKeyGroupRange, + int totalKeyGroups) { + + this.priorityQueue = priorityQueue; + this.keyExtractorFunction = keyExtractorFunction; + this.metaInfo = metaInfo; + this.localKeyGroupRange = localKeyGroupRange; + this.totalKeyGroups = totalKeyGroups; + } + + @SuppressWarnings("unchecked") + @Nonnull + @Override + public StateSnapshot stateSnapshot() { + final T[] queueDump = (T[]) priorityQueue.toArray(new HeapPriorityQueueElement[priorityQueue.size()]); + + final TypeSerializer elementSerializer = metaInfo.getElementSerializer(); + + // turn the flat copy into a deep copy if required. + if (!elementSerializer.isImmutableType()) { + for (int i = 0; i < queueDump.length; ++i) { + queueDump[i] = elementSerializer.copy(queueDump[i]); + } + } + + return new HeapPriorityQueueStateSnapshot<>( + queueDump, + keyExtractorFunction, + metaInfo, --- End diff -- We only dump the queued elements here, should we also need to take a snapshot of the metaInfo in case something of it are might not immutable? ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202518473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java --- @@ -63,54 +72,46 @@ public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe } @SuppressWarnings("unchecked") - public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { + public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { this( snapshot.getName(), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)), - (TypeSerializer) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER), - (TypeSerializer) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); + (TypeSerializer) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)), + (TypeSerializer) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))); Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType()); } /** * Creates a deep copy of the itself. */ - public RegisteredBroadcastBackendStateMetaInfo deepCopy() { - return new RegisteredBroadcastBackendStateMetaInfo<>(this); + @Nonnull + public RegisteredBroadcastStateBackendMetaInfo deepCopy() { + return new RegisteredBroadcastStateBackendMetaInfo<>(this); } @Nonnull @Override public StateMetaInfoSnapshot snapshot() { - Map optionsMap = Collections.singletonMap( - StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), - assignmentMode.toString()); - Map> serializerMap = new HashMap<>(2); - Map serializerConfigSnapshotsMap = new HashMap<>(2); - String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(); - String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); - serializerMap.put(keySerializerKey, keySerializer.duplicate()); - serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration()); - serializerMap.put(valueSerializerKey, valueSerializer.duplicate()); - serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration()); - - return new StateMetaInfoSnapshot( - name, - StateMetaInfoSnapshot.BackendStateType.BROADCAST, - optionsMap, - serializerConfigSnapshotsMap, - serializerMap); + if (precomputedSnapshot == null) { + precomputedSnapshot = precomputeSnapshot(); + } + return precomputedSnapshot; --- End diff -- What if the serializers are not all immutable? Should we need a `immutable` field for it? Only when it is true we return the `precomputeSnapshot`. ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202519294 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +/** + * + * @param --- End diff -- Description for `T` is missing ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thanks! Looking forward~ ---
[GitHub] flink issue #6306: [FLINK-9804][state] KeyedStateBackend.getKeys() does not ...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6306 @aljoscha Thanks for your quick review, will address your comments while merging. ---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6308#discussion_r201778299 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java --- @@ -98,8 +103,7 @@ public int getVersion() { @Override public int[] getCompatibleVersions() { - // we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x) - return new int[] {VERSION, 3, 2, 1}; + return new int[]{VERSION, 4, 3, 2, 1}; --- End diff -- nit: miss a ' ' between '[]' and '{'. ---
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6301 Hi @jrthe42, In general the checkpoint include two part of works. - part1: take a snapshot of the state. - part2: transfer the snapshot to the checkpoint destination(e.g. DFS) The part1 need to be sync, and the part2 can be async, if I'm not wrong. ---
[GitHub] flink issue #6306: [FLINK-9804][state] KeyedStateBackend.getKeys() does not ...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6306 CC @aljoscha ---
[GitHub] flink pull request #6306: [FLINK-9804][state] KeyedStateBackend.getKeys() do...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6306 [FLINK-9804][state] KeyedStateBackend.getKeys() does not work on RocksDB MapState ## What is the purpose of the change *This PR fixes the bug that the KeyedStateBackend.getKeys() does not work on RocksDB MapState.* ## Brief change log - *Change `RocksDBKeyedStateBackend#RocksIteratorForKeysWrapper()` to let it support get keys for RocksDB MapState.* ## Verifying this change - *Added `StateBackendTestBase#testMapStateGetKeys()` to guard the changes* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9804 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6306.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6306 commit efc4096a4a6f38b3acb0b5189804f7b452218f23 Author: sihuazhou Date: 2018-07-11T12:35:22Z fix KeyedStateBackend.getKeys() for RocksDBMapState. ---
[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6301#discussion_r201676322 --- Diff: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java --- @@ -111,110 +134,117 @@ public void writeRecord(Row row) throws IOException { if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) { LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); } - try { + synchronized (this) { --- End diff -- Why do we need synchronized this? ---
[GitHub] flink pull request #6133: [FLINK-9351][Distributed Coordination] RM stop ass...
Github user sihuazhou closed the pull request at: https://github.com/apache/flink/pull/6133 ---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6228 +1 from my side ---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 Hi @tillrohrmann I added a unit test `FsCheckpointStorageTest#testResolveCheckpointStorageLocation()` to guard the changes. But I still use one `mock` for creating two different file systems(one for checkpoint, another for savepoint), because without mock I can only create a LocalFileSystem in the test. Could you please have a look again? ---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 @tillrohrmann Thanks for your review and suggestion, will change it tonight~ ---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 @yanghua Thanks for the review, I rebased the PR. ---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6228 @StefanRRichter very nice PR, I only had some very minor comments. ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199325117 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; + +/** + * Interface for collection that gives in order access to elements w.r.t their priority. + * + * @param type of elements in the ordered set. + */ +@Internal +public interface InternalPriorityQueue { + + /** +* Retrieves and removes the first element (w.r.t. the order) of this set, +* or returns {@code null} if this set is empty. +* +* @return the first element of this ordered set, or {@code null} if this set is empty. +*/ + @Nullable + T poll(); + + /** +* Retrieves, but does not remove, the element (w.r.t. order) of this set, +* or returns {@code null} if this set is empty. +* +* @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty. +*/ + @Nullable + T peek(); + + /** +* Adds the given element to the set, if it is not already contained. +* +* @param toAdd the element to add to the set. +* @return true if the operation changed the head element or if is it unclear if the head element changed. +* Only returns false iff the head element was not changed by this operation. +*/ + boolean add(@Nonnull T toAdd); + + /** +* Removes the given element from the set, if is contained in the set. +* +* @param toRemove the element to remove. +* @return true if the operation changed the head element or if is it unclear if the head element changed. --- End diff -- nit: unclosed HTML tag `` ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199325112 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; + +/** + * Interface for collection that gives in order access to elements w.r.t their priority. + * + * @param type of elements in the ordered set. + */ +@Internal +public interface InternalPriorityQueue { + + /** +* Retrieves and removes the first element (w.r.t. the order) of this set, +* or returns {@code null} if this set is empty. +* +* @return the first element of this ordered set, or {@code null} if this set is empty. +*/ + @Nullable + T poll(); + + /** +* Retrieves, but does not remove, the element (w.r.t. order) of this set, +* or returns {@code null} if this set is empty. +* +* @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty. +*/ + @Nullable + T peek(); + + /** +* Adds the given element to the set, if it is not already contained. +* +* @param toAdd the element to add to the set. +* @return true if the operation changed the head element or if is it unclear if the head element changed. --- End diff -- nit: unclosed HTML tag `` ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199325523 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedStore implements CachingInternalPriorityQueueSet.OrderedSetStore { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET); + + /** The RocksDB instance that serves as store. */ + @Nonnull + private final RocksDB db; + + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ + @Nonnull + private final ColumnFamilyHandle columnFamilyHandle; + + /** Read options for RocksDB. */ + @Nonnull + private final ReadOptions readOptions; + + /** +* Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be +* aligned with their logical order. +*/ + @Nonnull + private final TypeSerializer byteOrderProducingSerializer; + + /** Wrapper to batch all writes to RocksDB. */ + @Nonnull + private final RocksDBWriteBatchWrapper batchWrapper; + + /** The key-group id of all elements stored in this instance. */ + @Nonnegative + private final int keyGroupId; + + /** The key-group id in serialized form. */ + @Nonnull + private final byte[] groupPrefixBytes; + + /** Output stream that helps to serialize elements. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outputStream; + + /** Output view that helps to serialize elements, must wrap the output stream. */ + @Nonnull + private final DataOutputViewStreamWrapper outputView; + + public RocksDBOrderedStore( + @Nonnegative int keyGroupId, + @Nonnull RocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull ReadOptions readOptions, + @Nonnull TypeSerializer byteOrderProducingSerializer, + @Nonnull ByteArrayOutputStreamWithPos outputStream, + @Nonnull DataOutputViewStreamWrapper outputView, + @Nonnull RocksDBWriteBatchWrapper batchWrapper) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; +
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199325172 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Comparator; + +/** + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set + * semantics. + * + * @param the type of elements in the queue. + * @param type type of sub-queue used for each key-group partition. + */ +public class KeyGroupPartitionedPriorityQueue & HeapPriorityQueueElement> + implements InternalPriorityQueue { + + /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/ + @Nonnull + private final HeapPriorityQueue keyGroupHeap; + + /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */ + @Nonnull + private final PQ[] keyGroupLists; + + /** Function to extract the key from contained elements. */ + @Nonnull + private final KeyExtractorFunction keyExtractor; + + /** The total number of key-groups (in the job). */ + @Nonnegative + private final int totalKeyGroups; + + /** The smallest key-group id with a subpartition managed by this ordered set. */ + @Nonnegative + private final int firstKeyGroup; + + @SuppressWarnings("unchecked") + public KeyGroupPartitionedPriorityQueue( + @Nonnull KeyExtractorFunction keyExtractor, + @Nonnull Comparator elementComparator, + @Nonnull PartitionQueueSetFactory orderedCacheFactory, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups) { + + this.keyExtractor = keyExtractor; + this.totalKeyGroups = totalKeyGroups; + this.firstKeyGroup = keyGroupRange.getStartKeyGroup(); + this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()]; + this.keyGroupHeap = new HeapPriorityQueue<>( + new InternalPriorityQueueComparator<>(elementComparator), + keyGroupRange.getNumberOfKeyGroups()); + for (int i = 0; i < keyGroupLists.length; i++) { + final PQ keyGroupCache = + orderedCacheFactory.create(firstKeyGroup + i, elementComparator); + keyGroupLists[i] = keyGroupCache; + keyGroupHeap.add(keyGroupCache); + } + } + + @Nullable + @Override + public T poll() { + final PQ headList = keyGroupHeap.peek(); + final T head = headList.poll(); + keyGroupHeap.adjustModifiedElement(headList); + return head; + } + + @Nullable + @Override + public T peek() { + return keyGroupHeap.peek().peek(); + } + + @Override + public boolean add(@Nonnull T toAdd) { + final PQ list = getListForElementKeyGroup(toAdd); + + // the branch checks if the head element has
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199325190 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedStore implements CachingInternalPriorityQueueSet.OrderedSetStore { --- End diff -- Does `RocksDBOrderedStore` mean `RocksDBOrderedSetStore`? ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199320321 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedStore implements CachingInternalPriorityQueueSet.OrderedSetStore { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET); + + /** The RocksDB instance that serves as store. */ + @Nonnull + private final RocksDB db; + + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ + @Nonnull + private final ColumnFamilyHandle columnFamilyHandle; + + /** Read options for RocksDB. */ + @Nonnull + private final ReadOptions readOptions; + + /** +* Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be +* aligned with their logical order. +*/ + @Nonnull + private final TypeSerializer byteOrderProducingSerializer; + + /** Wrapper to batch all writes to RocksDB. */ + @Nonnull + private final RocksDBWriteBatchWrapper batchWrapper; + + /** The key-group id of all elements stored in this instance. */ + @Nonnegative + private final int keyGroupId; + + /** The key-group id in serialized form. */ + @Nonnull + private final byte[] groupPrefixBytes; + + /** Output stream that helps to serialize elements. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outputStream; + + /** Output view that helps to serialize elements, must wrap the output stream. */ + @Nonnull + private final DataOutputViewStreamWrapper outputView; + + public RocksDBOrderedStore( + @Nonnegative int keyGroupId, + @Nonnull RocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull ReadOptions readOptions, + @Nonnull TypeSerializer byteOrderProducingSerializer, + @Nonnull ByteArrayOutputStreamWithPos outputStream, + @Nonnull DataOutputViewStreamWrapper outputView, + @Nonnull RocksDBWriteBatchWrapper batchWrapper) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; +
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199320135 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; + +/** + * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of + * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth + * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited + * capacity. The cache is used to improve performance of accesses to the underlying store and contains contains an --- End diff -- duplicated `contains` ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199320254 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedStore implements CachingInternalPriorityQueueSet.OrderedSetStore { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET); + + /** The RocksDB instance that serves as store. */ + @Nonnull + private final RocksDB db; + + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ + @Nonnull + private final ColumnFamilyHandle columnFamilyHandle; + + /** Read options for RocksDB. */ + @Nonnull + private final ReadOptions readOptions; + + /** +* Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be +* aligned with their logical order. +*/ + @Nonnull + private final TypeSerializer byteOrderProducingSerializer; + + /** Wrapper to batch all writes to RocksDB. */ + @Nonnull + private final RocksDBWriteBatchWrapper batchWrapper; + + /** The key-group id of all elements stored in this instance. */ + @Nonnegative + private final int keyGroupId; + + /** The key-group id in serialized form. */ + @Nonnull + private final byte[] groupPrefixBytes; + + /** Output stream that helps to serialize elements. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outputStream; + + /** Output view that helps to serialize elements, must wrap the output stream. */ + @Nonnull + private final DataOutputViewStreamWrapper outputView; + + public RocksDBOrderedStore( + @Nonnegative int keyGroupId, + @Nonnull RocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull ReadOptions readOptions, + @Nonnull TypeSerializer byteOrderProducingSerializer, + @Nonnull ByteArrayOutputStreamWithPos outputStream, + @Nonnull DataOutputViewStreamWrapper outputView, + @Nonnull RocksDBWriteBatchWrapper batchWrapper) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; +
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199320146 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; + +/** + * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of + * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth + * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited + * capacity. The cache is used to improve performance of accesses to the underlying store and contains contains an + * ordered (partial) view on the top elements in the ordered store. We are currently applying simple write-trough --- End diff -- typo: `write-trough` -> `write-through` ---
[GitHub] flink issue #6125: [FLINK-9532] Flink Overview of Jobs Documentation Incorre...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6125 @yanghua sorry I'm not a native english speaker, so I think I'm not suitable to review this... ---
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 Hi @tillrohrmann I updated the PR could you please have a look again? ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r199315500 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set allocationIds, Exception cause) { --- End diff -- My previous thought was that `RM` needed to notify the `allocationIds` that was assigned to `JM`, because it was possible that `SlotManager` had already assigned slots to `JM`, but `TM` was killed before `JM` established a connection. Mainly to address the issue in https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach you suggested I think the problem in Flink-9351 has been fixed by the way. ---
[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 @tillrohrmann Thanks for your review and good suggestions, changing the code according to it. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198146934 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- In fact, what I means is that if you want to throw an exception here, you could throw the exception as `throw new RuntimeException("exception message", originalException)`, this way the original exception won't be swallowed. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198146056 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { --- End diff -- Yes. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198135717 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { --- End diff -- I would suggest to use the `entries.putAll()` instead, since it could get a better performance when your are using RocksDB backend. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198136425 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- Same here, do not swallow the original Exception. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198135796 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { --- End diff -- Same here. ---
[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198136431 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- I would suggest to not swallow the original Exception here. ---
[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6208#discussion_r198129785 --- Diff: flink-tests/src/test/resources/log4j-test.properties --- @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger --- End diff -- This looks like should be reverted. ---
[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6208#discussion_r198129120 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java --- @@ -75,10 +82,21 @@ public static DefaultSlotPoolFactory fromConfiguration( final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration); final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT)); + final SchedulingStrategy schedulingStrategy = selectSchedulingStrategy(configuration); + return new DefaultSlotPoolFactory( rpcService, + schedulingStrategy, SystemClock.getInstance(), rpcTimeout, slotIdleTimeout); } + + private static SchedulingStrategy selectSchedulingStrategy(Configuration configuration) { + if (configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)) { + return PreviousAllocationSchedulingStrategy.getInstance(); + } else { + return LocationPreferenceSchedulingStrategy.getInstance(); --- End diff -- Does it make sense to use the `LocationPreferenceSchedulingStrategy` for the init scheduling even when the "local recovery" is enable? ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015825 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSerializers) { + originalValues.add(typeSerializer.deserialize(source)); + } + return composeValueInternal(originalValues); + } + + @Override + public T deseriali
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015848 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSerializers) { + originalValues.add(typeSerializer.deserialize(source)); + } + return composeValueInternal(originalValues); + } + + @Override + public T deseriali
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015385 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) --- End diff -- Same here might result in a bad performance because of the random access of the list. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015282 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) --- End diff -- I would suggest to use the `Iterator` instead of `xxx.get(i)` here, because if the `originalSerializers` (or the other instances) is type of LinkedList the performance could be bad. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015105 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? --- End diff -- nit: if we don't use the high level API, we could do this in a single loop with a better performance. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198014886 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); --- End diff -- If `originalSerializers` or the `originalValues` is a type of something like `LinkedList`, then the `originalSerializers.get(i)` and `originalValues.get(i)` will get a very poor performance. I think we might should use the `iterator` here. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198014021 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; --- End diff -- I see, but this looks a bit weird, it looks like that the `stateFactories` is only used for looking up the corresponding state factory for the `stateDesc`, and we need to firstly create it every time when calling the `createStateAndWrapWithTtlIfEnabled()`, the flow of the `createStateAndWrapWithTtlIfEnabled()` looks like: - create a map of `state factory` (stateFactories). - use the stateFactories to look up the corresponding state factory for the `stateDesc`. - ... In that case, maybe use the `switch case` to find the corresponding state factory is better, at lest we don't need to firstly create a map of `state factory` for every call this way. What do you think? ---
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 @StefanRRichter Thanks for you reply! I think that makes sense, there is still a workaround for the user to go. `+1` to implement the current approach firstly. ---
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 Maybe let me elaborate the TTL checking condition in detail, overall the checking condition contains two parts and looks like `(current_ts - update_ts) - time_shift_offset >= TTL`. The `time_shift_offset` is the shift offset that we should applied when checking the TTL. - For the records that the `update_ts` > `checkpoint_ts`, we could know they were created(or updated) after the last restoring so we don't need to apply any shift to it. So that shift offset is `0`. - For the records that the `update_ts` <= `checkpoint_ts`, we could know they were created(or updated) before the last restoring so we need to apply the shift to it, the shift offset is `recovery_ts - checkpoint_ts`. In our current code, we didn't do the time-align works, it equals to a special case of the above condition where the `time_shift_offset` is always `0`. ---
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 I'm still a bit worried about the time-align problem on recovery(because I've met serval case that would become disaster on production if we don't do the time-align on recovery. (One instance: we used the DFS to store the checkpoint data, and the DFS went into safe-mode because of some problems, we took several hours to notice that and also took some times to address the issue. After addressing DFS's issue, user's jobs were resumed and begin to run correctly. In this case, if we don't do the time-align on recovery, then user's state maybe already totally expired(when TTL <= the `system down time`)). I had a second thought on this problem, and I think maybe we could do that without a full scanning of the records, the approach is outlined below. - 1. We need to remember the timestamp when performing the checkpoint, let's say it `checkpoint_ts`. - 2. We also need to remember the timestamp when recovering from the checkpoint, let's say it `recovery_ts`. - 3. For each record, we remember it's last update timestamp, let's say it `update_ts`. - 5. And the current time stamp is `current_ts`. - 4. Then we could use the follow condition `checkpoint_ts - update_ts + current_s - recovery_ts >= TTL` to check whether the record is expired. If it's true then record is expired, otherwise the record is still alive. What do you think? @azagrebin , and @StefanRRichter would be really nice to learn your opinion about this problem. ---
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 @azagrebin thanks for addressing the concerns, looks good from my side now. let's wait for @StefanRRichter 's review. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197329976 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); --- End diff -- I would suggest to give a init size for `originalValues`, e.g. `List originalValues = new ArrayList(originalSerializers.size());`. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331145 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { --- End diff -- Would be better to check the args are not null, or simply use the `@Nonnull` annotation. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197330095 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSerializers) { + originalValues.add(typeSerializer.deserialize(source)); + } + return composeValueInternal(originalValues); + } + + @Override + public T deseriali
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197330813 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java --- @@ -48,7 +48,8 @@ KeyedStateBackend, Snapshotable, Collection>, Closeable, - CheckpointListener { + CheckpointListener, + KeyedStateFactory{ --- End diff -- I think this seems to miss a space ` `. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + private IS createState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception { + StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197329533 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); --- End diff -- I think this check looks like a bug. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; --- End diff -- Why this couldn't be static? ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331898 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + private IS createState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception { + StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197330596 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSerializers) { + originalValues.add(typeSerializer.deserialize(source)); + } + return composeValueInternal(originalValues); + } + + @Override + public T deseriali
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331741 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + private IS createState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception { + StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331764 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + private IS createState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception { + StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197192870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); + Preconditions.checkArgument(ttl.toMilliseconds() >= 0, --- End diff -- Maybe we should pre check the `ttl` is not null, and I wonder does the `ttl.toMilliseconds() == 0` would make any sense? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197188305 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- Okay, I see this is tricky, I agree that this should be addressed in another PR. We need to figure out a proper way to do that. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197114442 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- Additional, If we won't do the time align works on recovery, then what is the safe `TTL` value we should set for the a job? (this is the question that the users always ask us when they trying to use the `TTL`(we implemented it in a hacking way based on `TtlDB`) to control the state's size) ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197111980 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- I'm really not sure whether we should leave it for now. If we leave it for now, then it will be a headache problem on practical production. As a very common situation there is a job, which reading data from kafka, and the user set the `TTL = 2hours` because he thinks that the data's latency is absolute less than 2 hours, this way they can use the TTL to safely control the whole state size, and got a exactly result. But, if he found that the job need to scale up, then he need to trigger a savepoint and rescale the job from it. but what if there's some problems that stop he recovering the job from the savepoint in a very short time, let's say he will took 30min to recover the job, then the result become inaccuracy. Even the user never need to trigger a savepoint for any reason, what if the job means some problem(maybe some problem with some machine) and loop in "failed-restart-failed-..", after 2 hours we fixed the problem and the job automatically resume, but the state has all been expired. I think this is a disaster for the user. Yes, when using the `EventTime` people this problem won't help, but the `ProccessTime` is a very common use case(In our production, most of the job's TimeCharacter is `ProccessTime`). I know Flink's TimeService also didn't do the time align works on recovery, but state's TTL is a bit different with Timer. When registering a timer, what users offer to the API is a absolute time, but when setting the TTL, what users offer is just a relative time, it's us that convert the relative time to a absolute time to implement the TTL. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197080576 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we need to "update on read", then here is a bit confusion to me. Currently we attach TTL for every list item, so the "update on read" should scope to the list item, not the whole list. So, it makes me feel that an `iterable` for `updateTs` seems more reasonable. What do you think? ---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6194 CC @zentol ---
[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6194 [FLINK-9633][checkpoint] Use savepoint path's file system to create checkpoint output stream ## What is the purpose of the change *This PR fixes Flink doesn't use the savepoint path's filesystem to create the output stream on TM side.* ## Brief change log - *Use Savepoint path's file system to create checkpoint output stream.* ## Verifying this change - *Added `StreamTaskTest#testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint()` to verify the changes* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9633 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6194 commit f006416f652485bd59124a57774fdeaafa81824b Author: sihuazhou Date: 2018-06-21T09:42:54Z Use Savepoint path's file system to create checkpoint output stream. ---
[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197008798 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); - - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { + + R request; + if (msgContent.capacity() == 0) { + try { + request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Request did not conform to expected format.", je); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); + return; + } + } else { + try { + ByteBufInputStream in = new ByteBufInputStream(msgContent); --- End diff -- I would suggest to use try-with-resource to make sure to close `in`. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001110 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalReducingState; + +import java.util.Collection; + +/** + * This class wraps reducing state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user value of state with TTL + */ +class TtlReducingState + extends AbstractTtlState, InternalReducingState>> + implements InternalReducingState { + TtlReducingState( + InternalReducingState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public T get() throws Exception { + return getInternal(); + } + + @Override + public void add(T value) throws Exception { + original.add(wrapWithTs(value, Long.MAX_VALUE)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, should we also do the `TTL` check for `original.mergeNamespaces()`? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196998472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we've called `getInterval()` in `get()`, and make the `updateTs()` to accept `Iterable`, then this method seems could be removed(Or at least, we should add a check for if the `iterable` is assignable from `List`, if true we could cast it to List and return immediately). ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197004891 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- The `expirationTimestamp` is an absolute timestamp, should we do the timestamp shift for `TtlValue` when checkpoint & recovery? For example, when user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. For some reason, he triggers a savepoint, and after 11 min he recover the job from the savepoint, if we don't do the timestamp shift, then all the state will be expired. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + * + */ +class TtlAggregatingState + extends AbstractTtlState, InternalAggregatingState, OUT>> + implements InternalAggregatingState { + + TtlAggregatingState( + InternalAggregatingState, OUT> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer, + TtlAggregateFunction aggregateFunction) { + super(originalState, config, timeProvider, valueSerializer); + aggregateFunction.stateClear = originalState::clear; + aggregateFunction.updater = originalState::updateInternal; + } + + @Override + public OUT get() throws Exception { + return original.get(); + } + + @Override + public void add(IN value) throws Exception { + original.add(value); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public ACC getInternal() throws Exception { + return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal); + } + + @Override + public void updateInternal(ACC valueToStore) throws Exception { + original.updateInternal(wrapWithTs(valueToStore)); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); --- End diff -- This looks a bit weird, my gut feeling is that we should call `getInternal()` in `get()`(as we called `updateInternal()` in `update()` in this class), but here is reverse. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996839 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); --- End diff -- In this block, we need to iterate the `ttlValue` twice, one for `collect()` and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as the argument, then we can avoiding the `collect()` here, this way we only need to iterate the `ttlValue` once. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Maybe we should also check that the `ttl` is greater than 0? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196827863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- Oh sorry, my bad, I'm misunderstand... ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196820474 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttl = config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196809755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + return timeProvider.currentTimestamp() + config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196817846 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The var `finalResult` looks like redundant or I'm misunderstand. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196816259 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + @Override + public void updateInternal(List valueToStore) throws Exception { + Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null."); + original.addAll(withTs(valueToStore)); --- End diff -- This seems to miss a `clear()`. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196791555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Why not checking for `ttl`? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196786370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- Does it make sense to never expire the value when the `ttValue.getExpirationTimestamp()` return negative? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196784275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- This looks like a bit problematic, because the `ttlValue.getExpirationTimestamp()` might be negative. E.g when the user provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value should never be expired, but according to the current code, it will immediately expired. ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Could anybody have a look at this? ---
[GitHub] flink issue #6185: [FLINK-9619][YARN] Eagerly close the connection with task...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6185 CC @tillrohrmann ---
[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6185 [FLINK-9619][YARN] Eagerly close the connection with task manager when the container is completed ## What is the purpose of the change *We should always eagerly close the connection with task manager when the container is completed.* ## Brief change log - *Eagerly close the connection with task manager when the container is completed* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9619 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6185.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6185 commit a667ea120b0c2519ce45e5919c1377e897897d17 Author: sihuazhou Date: 2018-06-20T04:41:05Z Eagerly close the connection with task manager when the container is completed. ---
[GitHub] flink pull request #6151: [FLINK-9569] [avro] Fix confusing construction of ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6151#discussion_r196366033 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -105,41 +108,54 @@ /** The currently accessing thread, set and checked on debug level only. */ private transient volatile Thread currentThread; - // + // --- instantiation methods -- /** * Creates a new AvroSerializer for the type indicated by the given class. -* This constructor is intended to be used with {@link SpecificRecord} or reflection serializer. -* For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)} +* +* This constructor is expected to be used only with {@link GenericRecord}. +* For {@link SpecificRecord} or reflection serializer use {@link AvroSerializer#forNonGeneric(Class)}. +* +* @param schema the explicit schema to use for generic records. */ - public AvroSerializer(Class type) { - checkArgument(!isGenericRecord(type), - "For GenericData.Record use constructor with explicit schema."); - this.type = checkNotNull(type); - this.schemaString = null; + public static AvroSerializer forGeneric(Schema schema) { + return new AvroSerializer<>(GenericRecord.class, schema); --- End diff -- Should we do checking for schema to make sure it not null here? ---
[GitHub] flink issue #6088: [FLINK-9417][ Distributed Coordination] Send heartbeat re...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6088 cc @tillrohrmann ---
[GitHub] flink issue #6174: [FLINK-9601][state]Try to reuse the snapshotData array as...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6174 @StefanRRichter Thanks for the review, I think that makes a lot of sense, will change it according to your suggestion. ---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903383 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -110,7 +123,7 @@ /** * Map of state names to their corresponding restored state meta info. * -* +* --- End diff -- I think the might should be reverted. ---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903191 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -244,7 +271,14 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) { return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( stateResponse -> { try { - return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent())); + if (!STATE_FACTORIES.containsKey(stateDescriptor.getClass())) { + String message = String.format("State %s is not supported by %s", + stateDescriptor.getClass(), this.getClass()); + throw new FlinkRuntimeException(message); + } + return STATE_FACTORIES + .get(stateDescriptor.getClass()) --- End diff -- Maybe we can merge the `containsKey()` and the `get()` into a single `get()`, this way we don't need to query the `STATE_FACTORIES` twice. ---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903638 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOExcepti } @Override - protected InternalValueState createValueState( - TypeSerializer namespaceSerializer, - ValueStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBValueState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - this); - } - - @Override - protected InternalListState createListState( - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc) throws Exception { - - Tuple2>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBListState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getElementSerializer(), - this); - } - - @Override - protected InternalReducingState createReducingState( - TypeSerializer namespaceSerializer, - ReducingStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBReducingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getReduceFunction(), - this); - } - - @Override - protected InternalAggregatingState createAggregatingState( - TypeSerializer namespaceSerializer, - AggregatingStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBAggregatingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getAggregateFunction(), - this); - } - - @Override - protected InternalFoldingState createFoldingState( + public IS createState( TypeSerializer namespaceSerializer, - FoldingStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBFoldingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getFoldFunction(), - this); - } - - @Override - protected InternalMapState createMapState( - TypeSerializer namespaceSerializer, - MapStateDescriptor stateDesc) throws Exception { - - Tuple2>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBMapState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), -
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903619 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOExcepti } @Override - protected InternalValueState createValueState( - TypeSerializer namespaceSerializer, - ValueStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBValueState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - this); - } - - @Override - protected InternalListState createListState( - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc) throws Exception { - - Tuple2>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBListState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getElementSerializer(), - this); - } - - @Override - protected InternalReducingState createReducingState( - TypeSerializer namespaceSerializer, - ReducingStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBReducingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getReduceFunction(), - this); - } - - @Override - protected InternalAggregatingState createAggregatingState( - TypeSerializer namespaceSerializer, - AggregatingStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBAggregatingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getAggregateFunction(), - this); - } - - @Override - protected InternalFoldingState createFoldingState( + public IS createState( TypeSerializer namespaceSerializer, - FoldingStateDescriptor stateDesc) throws Exception { - - Tuple2> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBFoldingState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getFoldFunction(), - this); - } - - @Override - protected InternalMapState createMapState( - TypeSerializer namespaceSerializer, - MapStateDescriptor stateDesc) throws Exception { - - Tuple2>> registerResult = - tryRegisterKvStateInformation(stateDesc, namespaceSerializer); - - return new RocksDBMapState<>( - registerResult.f0, - registerResult.f1.getNamespaceSerializer(), - registerResult.f1.getStateSerializer(), - stateDesc.getDefaultValue(), -
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903979 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java --- @@ -23,7 +23,10 @@ /** * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual * {@link State} objects. + * + * @deprecated refactored to StateFactory in flink-runtime */ +@Deprecated @Internal public interface StateBinder { --- End diff -- Since it is internal, how about just remove it? ---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6173#discussion_r195903433 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -203,91 +216,16 @@ private boolean hasRegisteredState() { } @Override - public InternalValueState createValueState( - TypeSerializer namespaceSerializer, - ValueStateDescriptor stateDesc) throws Exception { - - StateTable stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); - return new HeapValueState<>( - stateTable, - keySerializer, - stateTable.getStateSerializer(), - stateTable.getNamespaceSerializer(), - stateDesc.getDefaultValue()); - } - - @Override - public InternalListState createListState( - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc) throws Exception { - - StateTable> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); - return new HeapListState<>( - stateTable, - keySerializer, - stateTable.getStateSerializer(), - stateTable.getNamespaceSerializer(), - stateDesc.getDefaultValue()); - } - - @Override - public InternalReducingState createReducingState( - TypeSerializer namespaceSerializer, - ReducingStateDescriptor stateDesc) throws Exception { - - StateTable stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); - return new HeapReducingState<>( - stateTable, - keySerializer, - stateTable.getStateSerializer(), - stateTable.getNamespaceSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getReduceFunction()); - } - - @Override - public InternalAggregatingState createAggregatingState( - TypeSerializer namespaceSerializer, - AggregatingStateDescriptor stateDesc) throws Exception { - - StateTable stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); - return new HeapAggregatingState<>( - stateTable, - keySerializer, - stateTable.getStateSerializer(), - stateTable.getNamespaceSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getAggregateFunction()); - } - - @Override - public InternalFoldingState createFoldingState( - TypeSerializer namespaceSerializer, - FoldingStateDescriptor stateDesc) throws Exception { - - StateTable stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); - return new HeapFoldingState<>( - stateTable, - keySerializer, - stateTable.getStateSerializer(), - stateTable.getNamespaceSerializer(), - stateDesc.getDefaultValue(), - stateDesc.getFoldFunction()); - } - - @Override - protected InternalMapState createMapState( - TypeSerializer namespaceSerializer, - MapStateDescriptor stateDesc) throws Exception { - - StateTable> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc); - - return new HeapMapState<>( - stateTable, - keySerializer, - stateTable.getStateSerializer(), - stateTable.getNamespaceSerializer(), - stateDesc.getDefaultValue()); + public IS createState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception { + if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), this.getClass()); + throw new FlinkRuntimeException(message);
[GitHub] flink issue #6174: [FLINNK-9601][state]Try to reuse the snapshotData array a...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6174 CC @StefanRRichter ---